From b777c6c567304d89482cf05a58aff2693ea2b0e7 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 2 Feb 2026 20:32:12 -0800 Subject: [PATCH 1/3] added the test infra using gtest, and some tests --- .gitignore | 2 + CMakeLists.txt | 24 +- CMakePresets.json | 182 +++- README.md | 94 +- build.cmd | 18 + build.sh | 40 +- src/ffi_client.cpp | 2 +- src/local_participant.cpp | 2 - src/tests/CMakeLists.txt | 180 ++++ src/tests/integration/test_audio_frame.cpp | 168 ++++ src/tests/integration/test_room.cpp | 162 ++++ src/tests/integration/test_rpc.cpp | 504 +++++++++++ .../integration/test_sdk_initialization.cpp | 78 ++ src/tests/stress/test_audio_frame_stress.cpp | 285 ++++++ src/tests/stress/test_room_stress.cpp | 240 +++++ src/tests/stress/test_rpc_stress.cpp | 847 ++++++++++++++++++ 16 files changed, 2786 insertions(+), 42 deletions(-) create mode 100644 src/tests/CMakeLists.txt create mode 100644 src/tests/integration/test_audio_frame.cpp create mode 100644 src/tests/integration/test_room.cpp create mode 100644 src/tests/integration/test_rpc.cpp create mode 100644 src/tests/integration/test_sdk_initialization.cpp create mode 100644 src/tests/stress/test_audio_frame_stress.cpp create mode 100644 src/tests/stress/test_room_stress.cpp create mode 100644 src/tests/stress/test_rpc_stress.cpp diff --git a/.gitignore b/.gitignore index a0132a0..cc4bdfc 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ build/ build-debug/ build-release/ vcpkg_installed/ +# Generated header +include/livekit/build.h received_green.avif docs/*.bak docs/html/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 8b5ef66..4ee6277 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ set(LIVEKIT_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}) list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") option(LIVEKIT_BUILD_EXAMPLES "Build LiveKit examples" OFF) +option(LIVEKIT_BUILD_TESTS "Build LiveKit tests" OFF) # vcpkg is only used on Windows; Linux/macOS use system package managers if(WIN32) @@ -131,8 +132,8 @@ add_custom_command( target_sources(livekit_proto PRIVATE ${PROTO_SRCS} ${PROTO_HDRS}) -set(GENERATED_INCLUDE_DIR "${LIVEKIT_BINARY_DIR}/generated") -file(MAKE_DIRECTORY "${GENERATED_INCLUDE_DIR}") +# Generate build.h directly into include/livekit/ for simpler include paths +set(GENERATED_BUILD_H "${LIVEKIT_ROOT_DIR}/include/livekit/build.h") set(GIT_COMMIT "unknown") find_package(Git QUIET) @@ -151,7 +152,7 @@ set(GENERATED_COMMENT "This file was auto-generated by CMake on ${BUILD_DATE}. D configure_file( "${LIVEKIT_ROOT_DIR}/build.h.in" - "${GENERATED_INCLUDE_DIR}/build.h" + "${GENERATED_BUILD_H}" @ONLY ) @@ -321,7 +322,6 @@ target_include_directories(livekit ${LIVEKIT_ROOT_DIR}/src ${RUST_ROOT}/livekit-ffi/include ${PROTO_BINARY_DIR} - ${GENERATED_INCLUDE_DIR} ) target_link_libraries(livekit @@ -335,15 +335,12 @@ message(STATUS "Protobuf: version=${Protobuf_VERSION}; protoc=${Protobuf_PROTOC_ add_dependencies(livekit build_rust_ffi) if(LIVEKIT_IS_TOPLEVEL) - # Copy public headers to build/include + # Copy public headers to build/include (build.h is already in include/livekit/) add_custom_command( TARGET livekit POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_directory ${LIVEKIT_ROOT_DIR}/include ${CMAKE_BINARY_DIR}/include - COMMAND ${CMAKE_COMMAND} -E copy - ${GENERATED_INCLUDE_DIR}/build.h - ${CMAKE_BINARY_DIR}/include/livekit/build.h COMMENT "Copying public headers to build/include" ) @@ -591,17 +588,12 @@ if(APPLE) endif() -# Install public headers +# Install public headers (includes generated build.h which is now in include/livekit/) install(DIRECTORY "${CMAKE_SOURCE_DIR}/include/" DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}" FILES_MATCHING PATTERN "*.h" PATTERN "*.hpp" ) -# Install generated build.h if it is part of the public headers -install(FILES "${GENERATED_INCLUDE_DIR}/build.h" - DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/livekit" -) - # ---- Create Config + Version + Targets for find_package(LiveKit CONFIG) ---- # cmake/LiveKitConfig.cmake.in must exist in your repo @@ -642,6 +634,10 @@ if(LIVEKIT_BUILD_EXAMPLES) add_subdirectory(examples) endif() +if(LIVEKIT_BUILD_TESTS) + add_subdirectory(src/tests) +endif() + add_custom_target(clean_generated COMMAND ${CMAKE_COMMAND} -E rm -rf "${PROTO_BINARY_DIR}" COMMENT "Clean generated protobuf files" diff --git a/CMakePresets.json b/CMakePresets.json index a15603a..bf03518 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -66,7 +66,9 @@ "inherits": "windows-base", "binaryDir": "${sourceDir}/build-release", "cacheVariables": { - "CMAKE_BUILD_TYPE": "Release" + "CMAKE_BUILD_TYPE": "Release", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -76,7 +78,9 @@ "inherits": "windows-base", "binaryDir": "${sourceDir}/build-debug", "cacheVariables": { - "CMAKE_BUILD_TYPE": "Debug" + "CMAKE_BUILD_TYPE": "Debug", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -88,6 +92,7 @@ "cacheVariables": { "CMAKE_BUILD_TYPE": "Release", "LIVEKIT_BUILD_EXAMPLES": "ON", + "LIVEKIT_BUILD_TESTS": "OFF", "VCPKG_MANIFEST_FEATURES": "examples" } }, @@ -100,6 +105,7 @@ "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug", "LIVEKIT_BUILD_EXAMPLES": "ON", + "LIVEKIT_BUILD_TESTS": "OFF", "VCPKG_MANIFEST_FEATURES": "examples" } }, @@ -110,7 +116,9 @@ "inherits": "linux-base", "binaryDir": "${sourceDir}/build-release", "cacheVariables": { - "CMAKE_BUILD_TYPE": "Release" + "CMAKE_BUILD_TYPE": "Release", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -120,7 +128,9 @@ "inherits": "linux-base", "binaryDir": "${sourceDir}/build-debug", "cacheVariables": { - "CMAKE_BUILD_TYPE": "Debug" + "CMAKE_BUILD_TYPE": "Debug", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -131,7 +141,8 @@ "binaryDir": "${sourceDir}/build-release", "cacheVariables": { "CMAKE_BUILD_TYPE": "Release", - "LIVEKIT_BUILD_EXAMPLES": "ON" + "LIVEKIT_BUILD_EXAMPLES": "ON", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -142,7 +153,8 @@ "binaryDir": "${sourceDir}/build-debug", "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug", - "LIVEKIT_BUILD_EXAMPLES": "ON" + "LIVEKIT_BUILD_EXAMPLES": "ON", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -152,7 +164,9 @@ "inherits": "macos-base", "binaryDir": "${sourceDir}/build-release", "cacheVariables": { - "CMAKE_BUILD_TYPE": "Release" + "CMAKE_BUILD_TYPE": "Release", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -162,7 +176,9 @@ "inherits": "macos-base", "binaryDir": "${sourceDir}/build-debug", "cacheVariables": { - "CMAKE_BUILD_TYPE": "Debug" + "CMAKE_BUILD_TYPE": "Debug", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -173,7 +189,8 @@ "binaryDir": "${sourceDir}/build-release", "cacheVariables": { "CMAKE_BUILD_TYPE": "Release", - "LIVEKIT_BUILD_EXAMPLES": "ON" + "LIVEKIT_BUILD_EXAMPLES": "ON", + "LIVEKIT_BUILD_TESTS": "OFF" } }, { @@ -184,7 +201,80 @@ "binaryDir": "${sourceDir}/build-debug", "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug", - "LIVEKIT_BUILD_EXAMPLES": "ON" + "LIVEKIT_BUILD_EXAMPLES": "ON", + "LIVEKIT_BUILD_TESTS": "OFF" + } + }, + { + "name": "windows-release-tests", + "displayName": "Windows x64 Release with Tests", + "description": "Build for Windows x64 Release with test suite", + "inherits": "windows-base", + "binaryDir": "${sourceDir}/build-release", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Release", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "ON" + } + }, + { + "name": "windows-debug-tests", + "displayName": "Windows x64 Debug with Tests", + "description": "Build for Windows x64 Debug with test suite", + "inherits": "windows-base", + "binaryDir": "${sourceDir}/build-debug", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "ON" + } + }, + { + "name": "linux-release-tests", + "displayName": "Linux Release with Tests", + "description": "Build for Linux Release with test suite", + "inherits": "linux-base", + "binaryDir": "${sourceDir}/build-release", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Release", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "ON" + } + }, + { + "name": "linux-debug-tests", + "displayName": "Linux Debug with Tests", + "description": "Build for Linux Debug with test suite", + "inherits": "linux-base", + "binaryDir": "${sourceDir}/build-debug", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "ON" + } + }, + { + "name": "macos-release-tests", + "displayName": "macOS Release with Tests", + "description": "Build for macOS Release with test suite", + "inherits": "macos-base", + "binaryDir": "${sourceDir}/build-release", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Release", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "ON" + } + }, + { + "name": "macos-debug-tests", + "displayName": "macOS Debug with Tests", + "description": "Build for macOS Debug with test suite", + "inherits": "macos-base", + "binaryDir": "${sourceDir}/build-debug", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "LIVEKIT_BUILD_EXAMPLES": "OFF", + "LIVEKIT_BUILD_TESTS": "ON" } } ], @@ -240,6 +330,78 @@ { "name": "macos-debug-examples", "configurePreset": "macos-debug-examples" + }, + { + "name": "windows-release-tests", + "configurePreset": "windows-release-tests", + "configuration": "Release" + }, + { + "name": "windows-debug-tests", + "configurePreset": "windows-debug-tests", + "configuration": "Debug" + }, + { + "name": "linux-release-tests", + "configurePreset": "linux-release-tests" + }, + { + "name": "linux-debug-tests", + "configurePreset": "linux-debug-tests" + }, + { + "name": "macos-release-tests", + "configurePreset": "macos-release-tests" + }, + { + "name": "macos-debug-tests", + "configurePreset": "macos-debug-tests" + } + ], + "testPresets": [ + { + "name": "windows-release-tests", + "configurePreset": "windows-release-tests", + "configuration": "Release", + "output": { + "outputOnFailure": true + } + }, + { + "name": "windows-debug-tests", + "configurePreset": "windows-debug-tests", + "configuration": "Debug", + "output": { + "outputOnFailure": true + } + }, + { + "name": "linux-release-tests", + "configurePreset": "linux-release-tests", + "output": { + "outputOnFailure": true + } + }, + { + "name": "linux-debug-tests", + "configurePreset": "linux-debug-tests", + "output": { + "outputOnFailure": true + } + }, + { + "name": "macos-release-tests", + "configurePreset": "macos-release-tests", + "output": { + "outputOnFailure": true + } + }, + { + "name": "macos-debug-tests", + "configurePreset": "macos-debug-tests", + "output": { + "outputOnFailure": true + } } ] } diff --git a/README.md b/README.md index 0cc98c1..574442e 100644 --- a/README.md +++ b/README.md @@ -45,11 +45,12 @@ git submodule update --init --recursive **Linux/macOS:** ```bash -./build.sh clean # Clean CMake build artifacts -./build.sh clean-all # Deep clean (C++ + Rust + generated files) -./build.sh debug # Build Debug version -./build.sh release # Build Release version -./build.sh verbose # Verbose build output +./build.sh clean # Clean CMake build artifacts +./build.sh clean-all # Deep clean (C++ + Rust + generated files) +./build.sh debug # Build Debug version +./build.sh release # Build Release version +./build.sh debug-tests # Build Debug with tests +./build.sh release-tests # Build Release with tests ``` **Windows** ```bash @@ -69,11 +70,12 @@ You must install protobuf via vcpkg (so CMake can find ProtobufConfig.cmake and **Windows:** ```powershell -.\build.cmd clean # Clean CMake build artifacts -.\build.cmd clean-all # Deep clean (C++ + Rust + generated files) -.\build.cmd debug # Build Debug version -.\build.cmd release # Build Release version -.\build.cmd verbose # Verbose build output +.\build.cmd clean # Clean CMake build artifacts +.\build.cmd clean-all # Deep clean (C++ + Rust + generated files) +.\build.cmd debug # Build Debug version +.\build.cmd release # Build Release version +.\build.cmd debug-tests # Build Debug with tests +.\build.cmd release-tests # Build Release with tests ``` ### Advanced Build (Using CMake Presets) @@ -205,6 +207,78 @@ On another terminal or computer, start the sender - Registers handlers for text and file streams, logs stream events, computes one-way latency, and saves the received file locally. +## 🧪 Integration & Stress Tests + +The SDK includes integration and stress tests using Google Test (gtest). + +### Build Tests + +**Linux/macOS:** +```bash +./build.sh debug-tests # Build Debug with tests +./build.sh release-tests # Build Release with tests +``` + +**Windows:** +```powershell +.\build.cmd debug-tests +.\build.cmd release-tests +``` + +### Run Tests + +After building, run tests using ctest or directly: + +```bash +# Run all tests via ctest +cd build-debug +ctest --output-on-failure + +# Or run test executables directly +./build-debug/bin/livekit_integration_tests +./build-debug/bin/livekit_stress_tests + +# Run specific test suites +./build-debug/bin/livekit_integration_tests --gtest_filter="*Rpc*" +./build-debug/bin/livekit_stress_tests --gtest_filter="*MaxPayloadStress*" +``` + +### Test Types + +| Executable | Description | +|------------|-------------| +| `livekit_integration_tests` | Quick tests (~1-2 minutes) for SDK functionality | +| `livekit_stress_tests` | Long-running tests (configurable, default 1 hour) | + +### RPC Test Environment Variables + +RPC integration and stress tests require a LiveKit server and two participant tokens: + +```bash +# Required +export LIVEKIT_URL="wss://your-server.livekit.cloud" +export LIVEKIT_CALLER_TOKEN="" +export LIVEKIT_RECEIVER_TOKEN="" + +# Optional (for stress tests) +export RPC_STRESS_DURATION_SECONDS=3600 # Test duration (default: 1 hour) +export RPC_STRESS_CALLER_THREADS=4 # Concurrent caller threads (default: 4) +``` + +**Generate tokens for RPC tests:** +```bash +lk token create -r test -i rpc-caller --join --valid-for 99999h --dev --room=rpc-test-room +lk token create -r test -i rpc-receiver --join --valid-for 99999h --dev --room=rpc-test-room +``` + +### Test Coverage + +- **SDK Initialization**: Initialize/shutdown lifecycle +- **Room**: Room creation, options, connection +- **Audio Frame**: Frame creation, manipulation, edge cases +- **RPC**: Round-trip calls, max payload (15KB), timeouts, errors, concurrent calls +- **Stress Tests**: High throughput, bidirectional RPC, memory pressure + ## 🧰 Recommended Setup ### macOS ```bash diff --git a/build.cmd b/build.cmd index d67e263..d750e39 100644 --- a/build.cmd +++ b/build.cmd @@ -132,6 +132,20 @@ if "%CMD%"=="release-examples" ( goto configure_build ) +if "%CMD%"=="debug-tests" ( + set "BUILD_TYPE=Debug" + set "PRESET=windows-debug-tests" + set "BUILD_DIR=%PROJECT_ROOT%\build-debug" + goto configure_build +) + +if "%CMD%"=="release-tests" ( + set "BUILD_TYPE=Release" + set "PRESET=windows-release-tests" + set "BUILD_DIR=%PROJECT_ROOT%\build-release" + goto configure_build +) + if "%CMD%"=="clean" goto clean if "%CMD%"=="clean-all" goto clean_all @@ -144,8 +158,10 @@ echo. echo Commands: echo debug Configure + build Debug version (build-debug/) echo debug-examples Configure + build Debug version with examples +echo debug-tests Configure + build Debug version with tests echo release Configure + build Release version (build-release/) echo release-examples Configure + build Release version with examples +echo release-tests Configure + build Release version with tests echo clean Clean both Debug and Release build directories echo clean-all Full clean (build dirs + Rust targets) echo help Show this help @@ -154,6 +170,8 @@ echo Examples: echo build.cmd debug echo build.cmd release echo build.cmd release-examples +echo build.cmd debug-tests +echo build.cmd release-tests echo build.cmd clean echo build.cmd clean-all goto :eof diff --git a/build.sh b/build.sh index 981a3f8..0fb1a70 100755 --- a/build.sh +++ b/build.sh @@ -33,8 +33,10 @@ Usage: Commands: debug Configure + build Debug version (build-debug/) debug-examples Configure + build Debug version with examples + debug-tests Configure + build Debug version with tests release Configure + build Release version (build-release/) release-examples Configure + build Release version with examples + release-tests Configure + build Release version with tests clean Clean both Debug and Release build directories clean-all Full clean (build dirs + Rust targets) help Show this help message @@ -56,6 +58,8 @@ Examples: ./build.sh release ./build.sh release-examples ./build.sh debug + ./build.sh debug-tests + ./build.sh release-tests ./build.sh clean ./build.sh clean-all EOF @@ -291,19 +295,19 @@ clean() { clean_all() { echo "==> Running full clean-all (C++ + Rust)..." - + echo "Removing build-debug directory..." rm -rf "${PROJECT_ROOT}/build-debug" || true - + echo "Removing build-release directory..." rm -rf "${PROJECT_ROOT}/build-release" || true - + echo "Removing Rust debug artifacts..." rm -rf "${PROJECT_ROOT}/client-sdk-rust/target/debug" || true - + echo "Removing Rust release artifacts..." rm -rf "${PROJECT_ROOT}/client-sdk-rust/target/release" || true - + echo "==> Clean-all complete." } @@ -367,6 +371,32 @@ case "${cmd}" in fi fi ;; + debug-tests) + BUILD_TYPE="Debug" + BUILD_DIR="${PROJECT_ROOT}/build-debug" + PRESET="${OS_TYPE}-debug-tests" + configure + build + if [[ "${DO_BUNDLE}" == "1" ]]; then + install_bundle + if [[ "${DO_ARCHIVE}" == "1" ]]; then + archive_bundle + fi + fi + ;; + release-tests) + BUILD_TYPE="Release" + BUILD_DIR="${PROJECT_ROOT}/build-release" + PRESET="${OS_TYPE}-release-tests" + configure + build + if [[ "${DO_BUNDLE}" == "1" ]]; then + install_bundle + if [[ "${DO_ARCHIVE}" == "1" ]]; then + archive_bundle + fi + fi + ;; clean) clean ;; diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 4f2cbe7..b59ffb6 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -17,10 +17,10 @@ #include #include -#include "build.h" #include "e2ee.pb.h" #include "ffi.pb.h" #include "ffi_client.h" +#include "livekit/build.h" #include "livekit/e2ee.h" #include "livekit/ffi_handle.h" #include "livekit/room.h" diff --git a/src/local_participant.cpp b/src/local_participant.cpp index 7ec3c76..0ffc472 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -282,7 +282,6 @@ void LocalParticipant::handleRpcMethodInvocation( const std::string &payload, double response_timeout_sec) { std::optional response_error; std::optional response_payload; - std::cout << "handleRpcMethodInvocation \n"; RpcInvocationData params{request_id, caller_identity, payload, response_timeout_sec}; auto it = rpc_handlers_.find(method); @@ -317,7 +316,6 @@ void LocalParticipant::handleRpcMethodInvocation( if (response_payload.has_value()) { msg->set_payload(*response_payload); } - std::cout << "handleRpcMethodInvocation sendrequest \n"; FfiClient::instance().sendRequest(req); } diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt new file mode 100644 index 0000000..bf611ed --- /dev/null +++ b/src/tests/CMakeLists.txt @@ -0,0 +1,180 @@ +cmake_minimum_required(VERSION 3.20) + +# ============================================================================ +# Google Test Setup via FetchContent +# ============================================================================ + +include(FetchContent) + +FetchContent_Declare( + googletest + GIT_REPOSITORY https://github.com/google/googletest.git + GIT_TAG v1.14.0 +) + +# Prevent overriding the parent project's compiler/linker settings on Windows +set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + +# Don't install gtest when installing this project +set(INSTALL_GTEST OFF CACHE BOOL "" FORCE) + +FetchContent_MakeAvailable(googletest) + +# Enable CTest +enable_testing() +include(GoogleTest) + +# ============================================================================ +# Integration Tests +# ============================================================================ + +# Collect all integration test sources +file(GLOB INTEGRATION_TEST_SOURCES + "${CMAKE_CURRENT_SOURCE_DIR}/integration/*.cpp" +) + +if(INTEGRATION_TEST_SOURCES) + add_executable(livekit_integration_tests + ${INTEGRATION_TEST_SOURCES} + ) + + target_link_libraries(livekit_integration_tests + PRIVATE + livekit + GTest::gtest_main + GTest::gmock + ) + + target_include_directories(livekit_integration_tests + PRIVATE + ${LIVEKIT_ROOT_DIR}/include + ${LIVEKIT_ROOT_DIR}/src + ) + + # Copy shared libraries to test executable directory + if(WIN32) + add_custom_command(TARGET livekit_integration_tests POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $ + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$/livekit_ffi.dll" + $ + COMMENT "Copying DLLs to integration test directory" + ) + elseif(APPLE) + add_custom_command(TARGET livekit_integration_tests POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $ + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$/liblivekit_ffi.dylib" + $ + COMMENT "Copying dylibs to integration test directory" + ) + else() + add_custom_command(TARGET livekit_integration_tests POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $ + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$/liblivekit_ffi.so" + $ + COMMENT "Copying shared libraries to integration test directory" + ) + endif() + + # Register tests with CTest + gtest_discover_tests(livekit_integration_tests + WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} + PROPERTIES + LABELS "integration" + ) +endif() + +# ============================================================================ +# Stress Tests +# ============================================================================ + +# Collect all stress test sources +file(GLOB STRESS_TEST_SOURCES + "${CMAKE_CURRENT_SOURCE_DIR}/stress/*.cpp" +) + +if(STRESS_TEST_SOURCES) + add_executable(livekit_stress_tests + ${STRESS_TEST_SOURCES} + ) + + target_link_libraries(livekit_stress_tests + PRIVATE + livekit + GTest::gtest_main + GTest::gmock + ) + + target_include_directories(livekit_stress_tests + PRIVATE + ${LIVEKIT_ROOT_DIR}/include + ${LIVEKIT_ROOT_DIR}/src + ) + + # Copy shared libraries to test executable directory + if(WIN32) + add_custom_command(TARGET livekit_stress_tests POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $ + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$/livekit_ffi.dll" + $ + COMMENT "Copying DLLs to stress test directory" + ) + elseif(APPLE) + add_custom_command(TARGET livekit_stress_tests POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $ + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$/liblivekit_ffi.dylib" + $ + COMMENT "Copying dylibs to stress test directory" + ) + else() + add_custom_command(TARGET livekit_stress_tests POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $ + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$/liblivekit_ffi.so" + $ + COMMENT "Copying shared libraries to stress test directory" + ) + endif() + + # Register tests with CTest (longer timeout for stress tests) + gtest_discover_tests(livekit_stress_tests + WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} + PROPERTIES + LABELS "stress" + TIMEOUT 300 + ) +endif() + +# ============================================================================ +# Combined test target +# ============================================================================ + +add_custom_target(run_all_tests + COMMAND ${CMAKE_CTEST_COMMAND} --output-on-failure + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} + COMMENT "Running all tests" +) + +if(TARGET livekit_integration_tests) + add_dependencies(run_all_tests livekit_integration_tests) +endif() + +if(TARGET livekit_stress_tests) + add_dependencies(run_all_tests livekit_stress_tests) +endif() diff --git a/src/tests/integration/test_audio_frame.cpp b/src/tests/integration/test_audio_frame.cpp new file mode 100644 index 0000000..e444def --- /dev/null +++ b/src/tests/integration/test_audio_frame.cpp @@ -0,0 +1,168 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +namespace livekit { +namespace test { + +class AudioFrameTest : public ::testing::Test { +protected: + void SetUp() override { livekit::initialize(livekit::LogSink::kConsole); } + + void TearDown() override { livekit::shutdown(); } +}; + +TEST_F(AudioFrameTest, CreateWithValidData) { + std::vector data(960, 0); // 10ms at 48kHz mono + AudioFrame frame(data, 48000, 1, 960); + + EXPECT_EQ(frame.sample_rate(), 48000); + EXPECT_EQ(frame.num_channels(), 1); + EXPECT_EQ(frame.samples_per_channel(), 960); + EXPECT_EQ(frame.total_samples(), 960); +} + +TEST_F(AudioFrameTest, CreateStereoFrame) { + std::vector data(1920, 0); // 10ms at 48kHz stereo + AudioFrame frame(data, 48000, 2, 960); + + EXPECT_EQ(frame.sample_rate(), 48000); + EXPECT_EQ(frame.num_channels(), 2); + EXPECT_EQ(frame.samples_per_channel(), 960); + EXPECT_EQ(frame.total_samples(), 1920); +} + +TEST_F(AudioFrameTest, CreateUsingStaticMethod) { + AudioFrame frame = AudioFrame::create(48000, 2, 960); + + EXPECT_EQ(frame.sample_rate(), 48000); + EXPECT_EQ(frame.num_channels(), 2); + EXPECT_EQ(frame.samples_per_channel(), 960); + EXPECT_EQ(frame.total_samples(), 1920); + + // Created frame should be zero-initialized + const auto &samples = frame.data(); + for (const auto &sample : samples) { + EXPECT_EQ(sample, 0); + } +} + +TEST_F(AudioFrameTest, Duration10ms) { + AudioFrame frame = AudioFrame::create(48000, 1, 480); + EXPECT_DOUBLE_EQ(frame.duration(), 0.01); // 10ms +} + +TEST_F(AudioFrameTest, Duration20ms) { + AudioFrame frame = AudioFrame::create(48000, 1, 960); + EXPECT_DOUBLE_EQ(frame.duration(), 0.02); // 20ms +} + +TEST_F(AudioFrameTest, DurationVarious) { + // 16kHz sample rate, 160 samples = 10ms + AudioFrame frame16k = AudioFrame::create(16000, 1, 160); + EXPECT_DOUBLE_EQ(frame16k.duration(), 0.01); + + // 44.1kHz sample rate, 441 samples = 10ms + AudioFrame frame44k = AudioFrame::create(44100, 1, 441); + EXPECT_DOUBLE_EQ(frame44k.duration(), 0.01); +} + +TEST_F(AudioFrameTest, DataAccessMutable) { + AudioFrame frame = AudioFrame::create(48000, 1, 480); + + // Modify data + auto &data = frame.data(); + data[0] = 1000; + data[1] = -1000; + + // Verify changes persisted + EXPECT_EQ(frame.data()[0], 1000); + EXPECT_EQ(frame.data()[1], -1000); +} + +TEST_F(AudioFrameTest, DataAccessConst) { + std::vector original_data = {100, 200, 300, 400}; + AudioFrame frame(original_data, 48000, 1, 4); + + const AudioFrame &const_frame = frame; + const auto &data = const_frame.data(); + + EXPECT_EQ(data[0], 100); + EXPECT_EQ(data[1], 200); + EXPECT_EQ(data[2], 300); + EXPECT_EQ(data[3], 400); +} + +TEST_F(AudioFrameTest, ToString) { + AudioFrame frame = AudioFrame::create(48000, 2, 960); + std::string desc = frame.to_string(); + + // Should contain relevant info + EXPECT_FALSE(desc.empty()); + EXPECT_NE(desc.find("48000"), std::string::npos); +} + +TEST_F(AudioFrameTest, DefaultConstructor) { + AudioFrame frame; + + // Default constructed frame should have zero values + EXPECT_EQ(frame.sample_rate(), 0); + EXPECT_EQ(frame.num_channels(), 0); + EXPECT_EQ(frame.samples_per_channel(), 0); + EXPECT_TRUE(frame.data().empty()); +} + +TEST_F(AudioFrameTest, CopySemantics) { + std::vector data = {1, 2, 3, 4}; + AudioFrame original(data, 48000, 1, 4); + + AudioFrame copy = original; + + EXPECT_EQ(copy.sample_rate(), original.sample_rate()); + EXPECT_EQ(copy.num_channels(), original.num_channels()); + EXPECT_EQ(copy.samples_per_channel(), original.samples_per_channel()); + EXPECT_EQ(copy.data(), original.data()); + + // Modifying copy should not affect original + copy.data()[0] = 999; + EXPECT_EQ(original.data()[0], 1); + EXPECT_EQ(copy.data()[0], 999); +} + +TEST_F(AudioFrameTest, MoveSemantics) { + std::vector data = {1, 2, 3, 4}; + AudioFrame original(data, 48000, 1, 4); + + AudioFrame moved = std::move(original); + + EXPECT_EQ(moved.sample_rate(), 48000); + EXPECT_EQ(moved.num_channels(), 1); + EXPECT_EQ(moved.samples_per_channel(), 4); + EXPECT_EQ(moved.data().size(), 4); +} + +TEST_F(AudioFrameTest, InvalidDataSizeThrows) { + // Data size doesn't match num_channels * samples_per_channel + std::vector data(100, 0); + + EXPECT_THROW(AudioFrame(data, 48000, 2, 960), std::invalid_argument); +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/integration/test_room.cpp b/src/tests/integration/test_room.cpp new file mode 100644 index 0000000..fca83a0 --- /dev/null +++ b/src/tests/integration/test_room.cpp @@ -0,0 +1,162 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace livekit { +namespace test { + +class RoomTest : public ::testing::Test { +protected: + void SetUp() override { livekit::initialize(livekit::LogSink::kConsole); } + + void TearDown() override { livekit::shutdown(); } +}; + +TEST_F(RoomTest, CreateRoom) { + Room room; + // Room should be created without issues + EXPECT_EQ(room.localParticipant(), nullptr) + << "Local participant should be null before connect"; +} + +TEST_F(RoomTest, RoomOptionsDefaults) { + RoomOptions options; + + EXPECT_TRUE(options.auto_subscribe) + << "auto_subscribe should default to true"; + EXPECT_FALSE(options.dynacast) << "dynacast should default to false"; + EXPECT_FALSE(options.rtc_config.has_value()) + << "rtc_config should not have a value by default"; + EXPECT_FALSE(options.encryption.has_value()) + << "encryption should not have a value by default"; +} + +TEST_F(RoomTest, RtcConfigDefaults) { + RtcConfig config; + + EXPECT_EQ(config.ice_transport_type, 0); + EXPECT_EQ(config.continual_gathering_policy, 0); + EXPECT_TRUE(config.ice_servers.empty()); +} + +TEST_F(RoomTest, IceServerConfiguration) { + IceServer server; + server.url = "stun:stun.l.google.com:19302"; + server.username = "user"; + server.credential = "pass"; + + EXPECT_EQ(server.url, "stun:stun.l.google.com:19302"); + EXPECT_EQ(server.username, "user"); + EXPECT_EQ(server.credential, "pass"); +} + +TEST_F(RoomTest, RoomWithCustomRtcConfig) { + RoomOptions options; + options.auto_subscribe = false; + options.dynacast = true; + + RtcConfig rtc_config; + rtc_config.ice_servers.push_back({"stun:stun.l.google.com:19302", "", ""}); + rtc_config.ice_servers.push_back( + {"turn:turn.example.com:3478", "user", "pass"}); + + options.rtc_config = rtc_config; + + EXPECT_FALSE(options.auto_subscribe); + EXPECT_TRUE(options.dynacast); + EXPECT_TRUE(options.rtc_config.has_value()); + EXPECT_EQ(options.rtc_config->ice_servers.size(), 2); +} + +TEST_F(RoomTest, RemoteParticipantsEmptyBeforeConnect) { + Room room; + auto participants = room.remoteParticipants(); + EXPECT_TRUE(participants.empty()) + << "Remote participants should be empty before connect"; +} + +TEST_F(RoomTest, RemoteParticipantLookupBeforeConnect) { + Room room; + auto participant = room.remoteParticipant("nonexistent"); + EXPECT_EQ(participant, nullptr) + << "Looking up participant before connect should return nullptr"; +} + +// Server-dependent tests - require LIVEKIT_URL and LIVEKIT_TOKEN env vars +class RoomServerTest : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogSink::kConsole); + + const char *url_env = std::getenv("LIVEKIT_URL"); + const char *token_env = std::getenv("LIVEKIT_TOKEN"); + + if (url_env && token_env) { + server_url_ = url_env; + token_ = token_env; + server_available_ = true; + } + } + + void TearDown() override { livekit::shutdown(); } + + bool server_available_ = false; + std::string server_url_; + std::string token_; +}; + +TEST_F(RoomServerTest, ConnectToServer) { + if (!server_available_) { + GTEST_SKIP() << "LIVEKIT_URL and LIVEKIT_TOKEN not set, skipping server " + "connection test"; + } + + Room room; + RoomOptions options; + + bool connected = room.Connect(server_url_, token_, options); + EXPECT_TRUE(connected) << "Should connect to server successfully"; + + if (connected) { + EXPECT_NE(room.localParticipant(), nullptr) + << "Local participant should exist after connect"; + } +} + +TEST_F(RoomServerTest, ConnectWithInvalidToken) { + if (!server_available_) { + GTEST_SKIP() << "LIVEKIT_URL not set, skipping invalid token test"; + } + + Room room; + RoomOptions options; + + bool connected = room.Connect(server_url_, "invalid_token", options); + EXPECT_FALSE(connected) << "Should fail to connect with invalid token"; +} + +TEST_F(RoomServerTest, ConnectWithInvalidUrl) { + Room room; + RoomOptions options; + + bool connected = room.Connect("wss://invalid.example.com", "token", options); + EXPECT_FALSE(connected) << "Should fail to connect to invalid URL"; +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/integration/test_rpc.cpp b/src/tests/integration/test_rpc.cpp new file mode 100644 index 0000000..547f1da --- /dev/null +++ b/src/tests/integration/test_rpc.cpp @@ -0,0 +1,504 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +using namespace std::chrono_literals; + +// Maximum RPC payload size (15KB) +constexpr size_t kMaxRpcPayloadSize = 15 * 1024; + +// Test configuration from environment variables +struct RpcTestConfig { + std::string url; + std::string caller_token; + std::string receiver_token; + bool available = false; + + static RpcTestConfig fromEnv() { + RpcTestConfig config; + const char *url = std::getenv("LIVEKIT_URL"); + const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + + if (url && caller_token && receiver_token) { + config.url = url; + config.caller_token = caller_token; + config.receiver_token = receiver_token; + config.available = true; + } + return config; + } +}; + +// Generate a random string of specified size +std::string generateRandomPayload(size_t size) { + static const char charset[] = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution<> dis(0, sizeof(charset) - 2); + + std::string result; + result.reserve(size); + for (size_t i = 0; i < size; ++i) { + result += charset[dis(gen)]; + } + return result; +} + +// Wait for a remote participant to appear +bool waitForParticipant(Room *room, const std::string &identity, + std::chrono::milliseconds timeout) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + if (room->remoteParticipant(identity) != nullptr) { + return true; + } + std::this_thread::sleep_for(100ms); + } + return false; +} + +class RpcIntegrationTest : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogSink::kConsole); + config_ = RpcTestConfig::fromEnv(); + } + + void TearDown() override { livekit::shutdown(); } + + RpcTestConfig config_; +}; + +// Test basic RPC round-trip +TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + // Create receiver room + auto receiver_room = std::make_unique(); + RoomOptions receiver_options; + receiver_options.auto_subscribe = true; + + bool receiver_connected = receiver_room->Connect( + config_.url, config_.receiver_token, receiver_options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + // Register RPC handler on receiver + std::atomic rpc_calls_received{0}; + receiver_room->localParticipant()->registerRpcMethod( + "echo", + [&rpc_calls_received]( + const RpcInvocationData &data) -> std::optional { + rpc_calls_received++; + return "echo: " + data.payload; + }); + + // Create caller room + auto caller_room = std::make_unique(); + RoomOptions caller_options; + caller_options.auto_subscribe = true; + + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, caller_options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + // Wait for receiver to be visible to caller + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + // Perform RPC call + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "echo", "hello world", 10.0); + + EXPECT_EQ(response, "echo: hello world"); + EXPECT_EQ(rpc_calls_received.load(), 1); + + // Cleanup + receiver_room->localParticipant()->unregisterRpcMethod("echo"); + caller_room.reset(); + receiver_room.reset(); +} + +// Test maximum payload size (15KB) +TEST_F(RpcIntegrationTest, MaxPayloadSize) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + // Register handler that echoes payload size + receiver_room->localParticipant()->registerRpcMethod( + "payload-size", + [](const RpcInvocationData &data) -> std::optional { + return std::to_string(data.payload.size()); + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + // Test with max payload size (15KB) + std::string max_payload = generateRandomPayload(kMaxRpcPayloadSize); + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "payload-size", max_payload, 30.0); + + EXPECT_EQ(response, std::to_string(kMaxRpcPayloadSize)); + + receiver_room->localParticipant()->unregisterRpcMethod("payload-size"); + caller_room.reset(); + receiver_room.reset(); +} + +// Test RPC timeout +TEST_F(RpcIntegrationTest, RpcTimeout) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + // Register handler that takes too long + receiver_room->localParticipant()->registerRpcMethod( + "slow-method", + [](const RpcInvocationData &) -> std::optional { + std::this_thread::sleep_for(10s); + return "done"; + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + // Call with short timeout - should fail + EXPECT_THROW( + { + caller_room->localParticipant()->performRpc(receiver_identity, + "slow-method", "", 2.0); + }, + RpcError); + + receiver_room->localParticipant()->unregisterRpcMethod("slow-method"); + caller_room.reset(); + receiver_room.reset(); +} + +// Test RPC with unsupported method +TEST_F(RpcIntegrationTest, UnsupportedMethod) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + // Call unregistered method + try { + caller_room->localParticipant()->performRpc(receiver_identity, + "nonexistent-method", "", 5.0); + FAIL() << "Expected RpcError for unsupported method"; + } catch (const RpcError &e) { + EXPECT_EQ(static_cast(e.code()), + RpcError::ErrorCode::UNSUPPORTED_METHOD); + } + + caller_room.reset(); + receiver_room.reset(); +} + +// Test RPC with application error +TEST_F(RpcIntegrationTest, ApplicationError) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + // Register handler that throws an error + receiver_room->localParticipant()->registerRpcMethod( + "error-method", + [](const RpcInvocationData &) -> std::optional { + throw std::runtime_error("intentional error"); + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + try { + caller_room->localParticipant()->performRpc(receiver_identity, + "error-method", "", 5.0); + FAIL() << "Expected RpcError for application error"; + } catch (const RpcError &e) { + EXPECT_EQ(static_cast(e.code()), + RpcError::ErrorCode::APPLICATION_ERROR); + } + + receiver_room->localParticipant()->unregisterRpcMethod("error-method"); + caller_room.reset(); + receiver_room.reset(); +} + +// Test multiple concurrent RPC calls +TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic calls_processed{0}; + receiver_room->localParticipant()->registerRpcMethod( + "counter", + [&calls_processed]( + const RpcInvocationData &data) -> std::optional { + int id = std::stoi(data.payload); + calls_processed++; + std::this_thread::sleep_for(100ms); // Simulate some work + return std::to_string(id * 2); + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + const int num_concurrent_calls = 10; + std::vector threads; + std::atomic successful_calls{0}; + + for (int i = 0; i < num_concurrent_calls; ++i) { + threads.emplace_back([&, i]() { + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "counter", std::to_string(i), 30.0); + int expected = i * 2; + if (std::stoi(response) == expected) { + successful_calls++; + } + } catch (const std::exception &e) { + std::cerr << "RPC call " << i << " failed: " << e.what() << std::endl; + } + }); + } + + for (auto &t : threads) { + t.join(); + } + + EXPECT_EQ(successful_calls.load(), num_concurrent_calls); + EXPECT_EQ(calls_processed.load(), num_concurrent_calls); + + receiver_room->localParticipant()->unregisterRpcMethod("counter"); + caller_room.reset(); + receiver_room.reset(); +} + +// Integration test: Run for approximately 1 minute +TEST_F(RpcIntegrationTest, OneMinuteIntegration) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + std::atomic total_bytes_received{0}; + + receiver_room->localParticipant()->registerRpcMethod( + "integration-test", + [&](const RpcInvocationData &data) -> std::optional { + total_received++; + total_bytes_received += data.payload.size(); + return "ack:" + std::to_string(data.payload.size()); + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + // Run for 1 minute + const auto test_duration = 60s; + const auto start_time = std::chrono::steady_clock::now(); + + std::atomic total_sent{0}; + std::atomic successful_calls{0}; + std::atomic failed_calls{0}; + std::atomic running{true}; + + // Sender thread + std::thread sender([&]() { + std::vector payload_sizes = {100, 1024, 5 * 1024, 10 * 1024, + kMaxRpcPayloadSize}; + int size_index = 0; + + while (running.load()) { + size_t payload_size = payload_sizes[size_index % payload_sizes.size()]; + std::string payload = generateRandomPayload(payload_size); + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "integration-test", payload, 30.0); + if (response == "ack:" + std::to_string(payload_size)) { + successful_calls++; + } + } catch (const std::exception &e) { + failed_calls++; + } + + total_sent++; + size_index++; + std::this_thread::sleep_for(100ms); // Rate limit + } + }); + + // Wait for test duration + while (std::chrono::steady_clock::now() - start_time < test_duration) { + std::this_thread::sleep_for(1s); + std::cout << "Progress: sent=" << total_sent.load() + << " successful=" << successful_calls.load() + << " failed=" << failed_calls.load() + << " received=" << total_received.load() << std::endl; + } + + running.store(false); + sender.join(); + + std::cout << "\n=== Integration Test Results (1 minute) ===" << std::endl; + std::cout << "Total sent: " << total_sent.load() << std::endl; + std::cout << "Successful: " << successful_calls.load() << std::endl; + std::cout << "Failed: " << failed_calls.load() << std::endl; + std::cout << "Total received: " << total_received.load() << std::endl; + std::cout << "Total bytes received: " << total_bytes_received.load() + << std::endl; + + EXPECT_GT(successful_calls.load(), 0); + EXPECT_EQ(total_sent.load(), total_received.load()); + + receiver_room->localParticipant()->unregisterRpcMethod("integration-test"); + caller_room.reset(); + receiver_room.reset(); +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/integration/test_sdk_initialization.cpp b/src/tests/integration/test_sdk_initialization.cpp new file mode 100644 index 0000000..1c379af --- /dev/null +++ b/src/tests/integration/test_sdk_initialization.cpp @@ -0,0 +1,78 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace livekit { +namespace test { + +class SDKInitializationTest : public ::testing::Test { +protected: + void SetUp() override { + // Each test starts with a fresh SDK state + } + + void TearDown() override { + // Ensure SDK is shutdown after each test + livekit::shutdown(); + } +}; + +TEST_F(SDKInitializationTest, InitializeWithConsoleLogging) { + bool result = livekit::initialize(livekit::LogSink::kConsole); + EXPECT_TRUE(result) << "First initialization should succeed"; +} + +TEST_F(SDKInitializationTest, InitializeWithCallbackLogging) { + bool result = livekit::initialize(livekit::LogSink::kCallback); + EXPECT_TRUE(result) << "Initialization with callback logging should succeed"; +} + +TEST_F(SDKInitializationTest, DoubleInitializationReturnsFalse) { + bool first = livekit::initialize(livekit::LogSink::kConsole); + EXPECT_TRUE(first) << "First initialization should succeed"; + + bool second = livekit::initialize(livekit::LogSink::kConsole); + EXPECT_FALSE(second) << "Second initialization should return false"; +} + +TEST_F(SDKInitializationTest, ReinitializeAfterShutdown) { + bool first = livekit::initialize(livekit::LogSink::kConsole); + EXPECT_TRUE(first) << "First initialization should succeed"; + + livekit::shutdown(); + + bool second = livekit::initialize(livekit::LogSink::kConsole); + EXPECT_TRUE(second) << "Re-initialization after shutdown should succeed"; +} + +TEST_F(SDKInitializationTest, ShutdownWithoutInitialize) { + // Should not crash + EXPECT_NO_THROW(livekit::shutdown()); +} + +TEST_F(SDKInitializationTest, MultipleShutdowns) { + livekit::initialize(livekit::LogSink::kConsole); + + // Multiple shutdowns should not crash + EXPECT_NO_THROW(livekit::shutdown()); + EXPECT_NO_THROW(livekit::shutdown()); + EXPECT_NO_THROW(livekit::shutdown()); +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/stress/test_audio_frame_stress.cpp b/src/tests/stress/test_audio_frame_stress.cpp new file mode 100644 index 0000000..9ba4694 --- /dev/null +++ b/src/tests/stress/test_audio_frame_stress.cpp @@ -0,0 +1,285 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +class AudioFrameStressTest : public ::testing::Test { +protected: + void SetUp() override { livekit::initialize(livekit::LogSink::kConsole); } + + void TearDown() override { livekit::shutdown(); } +}; + +// Stress test: Rapid creation and destruction of AudioFrames +TEST_F(AudioFrameStressTest, RapidFrameCreation) { + const int num_iterations = 10000; + const int sample_rate = 48000; + const int num_channels = 2; + const int samples_per_channel = 960; // 20ms at 48kHz + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < num_iterations; ++i) { + AudioFrame frame = + AudioFrame::create(sample_rate, num_channels, samples_per_channel); + ASSERT_EQ(frame.sample_rate(), sample_rate); + ASSERT_EQ(frame.num_channels(), num_channels); + ASSERT_EQ(frame.samples_per_channel(), samples_per_channel); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Created " << num_iterations << " AudioFrames in " + << duration.count() << "ms" + << " (" << (num_iterations * 1000.0 / duration.count()) + << " frames/sec)" << std::endl; +} + +// Stress test: Large buffer allocation +TEST_F(AudioFrameStressTest, LargeBufferAllocation) { + const int sample_rate = 48000; + const int num_channels = 8; // 7.1 surround + const int samples_per_channel = 48000; // 1 second of audio + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < 100; ++i) { + AudioFrame frame = + AudioFrame::create(sample_rate, num_channels, samples_per_channel); + ASSERT_EQ(frame.total_samples(), + static_cast(num_channels * samples_per_channel)); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Created 100 large (1 second, 8-channel) AudioFrames in " + << duration.count() << "ms" << std::endl; +} + +// Stress test: Concurrent frame creation from multiple threads +TEST_F(AudioFrameStressTest, ConcurrentFrameCreation) { + const int num_threads = 8; + const int frames_per_thread = 1000; + std::atomic total_frames{0}; + std::vector threads; + + auto start = std::chrono::high_resolution_clock::now(); + + for (int t = 0; t < num_threads; ++t) { + threads.emplace_back([&total_frames, frames_per_thread]() { + for (int i = 0; i < frames_per_thread; ++i) { + AudioFrame frame = AudioFrame::create(48000, 2, 960); + if (frame.sample_rate() == 48000) { + total_frames.fetch_add(1, std::memory_order_relaxed); + } + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + EXPECT_EQ(total_frames.load(), num_threads * frames_per_thread); + + std::cout << "Created " << total_frames.load() << " AudioFrames across " + << num_threads << " threads in " << duration.count() << "ms" + << std::endl; +} + +// Stress test: Memory pressure with many simultaneous frames +TEST_F(AudioFrameStressTest, MemoryPressure) { + const int num_frames = 1000; + std::vector frames; + frames.reserve(num_frames); + + auto start = std::chrono::high_resolution_clock::now(); + + // Create many frames and keep them alive + for (int i = 0; i < num_frames; ++i) { + frames.push_back(AudioFrame::create(48000, 2, 960)); + } + + // Verify all frames are valid + for (const auto &frame : frames) { + ASSERT_EQ(frame.sample_rate(), 48000); + ASSERT_EQ(frame.num_channels(), 2); + ASSERT_EQ(frame.samples_per_channel(), 960); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Held " << num_frames << " AudioFrames simultaneously in " + << duration.count() << "ms" << std::endl; + + // Frames are destroyed when vector goes out of scope +} + +// Stress test: Data modification under load +TEST_F(AudioFrameStressTest, DataModificationUnderLoad) { + const int num_frames = 100; + const int modifications_per_frame = 100; + + auto start = std::chrono::high_resolution_clock::now(); + + for (int f = 0; f < num_frames; ++f) { + AudioFrame frame = AudioFrame::create(48000, 2, 960); + auto &data = frame.data(); + + for (int m = 0; m < modifications_per_frame; ++m) { + // Simulate audio processing + for (size_t i = 0; i < data.size(); ++i) { + data[i] = static_cast((data[i] * 2 + m) % 32767); + } + } + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Modified " << num_frames << " frames " + << modifications_per_frame << " times each in " << duration.count() + << "ms" << std::endl; +} + +// Stress test: Copy operations +TEST_F(AudioFrameStressTest, CopyOperationsStress) { + const int num_copies = 1000; + std::vector original_data(1920, 12345); + AudioFrame original(original_data, 48000, 2, 960); + + auto start = std::chrono::high_resolution_clock::now(); + + std::vector copies; + copies.reserve(num_copies); + + for (int i = 0; i < num_copies; ++i) { + copies.push_back(original); + } + + // Verify all copies are independent + for (auto © : copies) { + ASSERT_EQ(copy.data()[0], 12345); + copy.data()[0] = 0; // Modify copy + } + + // Original should be unchanged + ASSERT_EQ(original.data()[0], 12345); + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Performed " << num_copies << " copy operations in " + << duration.count() << "ms" << std::endl; +} + +// Stress test: Move operations +TEST_F(AudioFrameStressTest, MoveOperationsStress) { + const int num_moves = 10000; + + auto start = std::chrono::high_resolution_clock::now(); + + AudioFrame frame = AudioFrame::create(48000, 2, 960); + + for (int i = 0; i < num_moves; ++i) { + AudioFrame moved = std::move(frame); + frame = std::move(moved); + } + + ASSERT_EQ(frame.sample_rate(), 48000); + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Performed " << num_moves << " move operations in " + << duration.count() << "ms" << std::endl; +} + +// Stress test: Simulated real-time audio processing +TEST_F(AudioFrameStressTest, SimulatedRealtimeProcessing) { + const int duration_seconds = 1; + const int sample_rate = 48000; + const int frame_size_ms = 10; + const int frames_per_second = 1000 / frame_size_ms; + const int total_frames = duration_seconds * frames_per_second; + const int samples_per_frame = sample_rate * frame_size_ms / 1000; + + std::vector processed_frames; + processed_frames.reserve(total_frames); + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < total_frames; ++i) { + // Simulate receiving audio + AudioFrame frame = AudioFrame::create(sample_rate, 2, samples_per_frame); + + // Simulate processing (apply simple gain) + auto &data = frame.data(); + for (size_t j = 0; j < data.size(); ++j) { + data[j] = static_cast(data[j] * 0.8); + } + + processed_frames.push_back(std::move(frame)); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + double processing_time_per_frame_us = + static_cast(duration.count()) / total_frames; + double available_time_per_frame_us = frame_size_ms * 1000.0; + + std::cout << "Processed " << total_frames << " frames (" << duration_seconds + << "s of audio)" << std::endl; + std::cout << "Average processing time per frame: " + << processing_time_per_frame_us << "us" << std::endl; + std::cout << "Available time per frame: " << available_time_per_frame_us + << "us" << std::endl; + std::cout << "Processing overhead: " + << (processing_time_per_frame_us / available_time_per_frame_us * + 100) + << "%" << std::endl; + + // Processing should be fast enough for real-time + EXPECT_LT(processing_time_per_frame_us, available_time_per_frame_us) + << "Processing takes longer than real-time allows"; +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/stress/test_room_stress.cpp b/src/tests/stress/test_room_stress.cpp new file mode 100644 index 0000000..f7c791c --- /dev/null +++ b/src/tests/stress/test_room_stress.cpp @@ -0,0 +1,240 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +class RoomStressTest : public ::testing::Test { +protected: + void SetUp() override { livekit::initialize(livekit::LogSink::kConsole); } + + void TearDown() override { livekit::shutdown(); } +}; + +// Stress test: Rapid Room object creation and destruction +TEST_F(RoomStressTest, RapidRoomCreation) { + const int num_iterations = 1000; + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < num_iterations; ++i) { + Room room; + ASSERT_EQ(room.localParticipant(), nullptr); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Created and destroyed " << num_iterations << " Room objects in " + << duration.count() << "ms" + << " (" << (num_iterations * 1000.0 / duration.count()) + << " rooms/sec)" << std::endl; +} + +// Stress test: Multiple simultaneous Room objects +TEST_F(RoomStressTest, MultipleSimultaneousRooms) { + const int num_rooms = 100; + std::vector> rooms; + rooms.reserve(num_rooms); + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < num_rooms; ++i) { + rooms.push_back(std::make_unique()); + } + + // Verify all rooms are valid + for (const auto &room : rooms) { + ASSERT_NE(room, nullptr); + ASSERT_EQ(room->localParticipant(), nullptr); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Held " << num_rooms << " Room objects simultaneously in " + << duration.count() << "ms" << std::endl; + + // Rooms are destroyed when vector goes out of scope +} + +// Stress test: Concurrent Room creation from multiple threads +TEST_F(RoomStressTest, ConcurrentRoomCreation) { + const int num_threads = 4; + const int rooms_per_thread = 100; + std::atomic total_rooms{0}; + std::vector threads; + + auto start = std::chrono::high_resolution_clock::now(); + + for (int t = 0; t < num_threads; ++t) { + threads.emplace_back([&total_rooms, rooms_per_thread]() { + for (int i = 0; i < rooms_per_thread; ++i) { + Room room; + if (room.localParticipant() == nullptr) { + total_rooms.fetch_add(1, std::memory_order_relaxed); + } + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + EXPECT_EQ(total_rooms.load(), num_threads * rooms_per_thread); + + std::cout << "Created " << total_rooms.load() << " Room objects across " + << num_threads << " threads in " << duration.count() << "ms" + << std::endl; +} + +// Stress test: RoomOptions creation and copying +TEST_F(RoomStressTest, RoomOptionsStress) { + const int num_iterations = 10000; + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < num_iterations; ++i) { + RoomOptions options; + options.auto_subscribe = (i % 2 == 0); + options.dynacast = (i % 3 == 0); + + RtcConfig rtc_config; + for (int j = 0; j < 5; ++j) { + IceServer server; + server.url = "stun:stun" + std::to_string(j) + ".example.com:19302"; + server.username = "user" + std::to_string(j); + server.credential = "pass" + std::to_string(j); + rtc_config.ice_servers.push_back(server); + } + options.rtc_config = rtc_config; + + // Copy the options + RoomOptions copy = options; + ASSERT_EQ(copy.auto_subscribe, options.auto_subscribe); + ASSERT_EQ(copy.dynacast, options.dynacast); + ASSERT_TRUE(copy.rtc_config.has_value()); + ASSERT_EQ(copy.rtc_config->ice_servers.size(), 5); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Created and copied " << num_iterations << " RoomOptions in " + << duration.count() << "ms" << std::endl; +} + +// Stress test: Stream handler registration and unregistration +TEST_F(RoomStressTest, StreamHandlerRegistrationStress) { + Room room; + const int num_topics = 100; + + auto start = std::chrono::high_resolution_clock::now(); + + // Register many handlers + for (int i = 0; i < num_topics; ++i) { + std::string topic = "topic_" + std::to_string(i); + room.registerTextStreamHandler( + topic, [](std::shared_ptr, const std::string &) {}); + room.registerByteStreamHandler( + topic, [](std::shared_ptr, const std::string &) {}); + } + + // Unregister all handlers + for (int i = 0; i < num_topics; ++i) { + std::string topic = "topic_" + std::to_string(i); + room.unregisterTextStreamHandler(topic); + room.unregisterByteStreamHandler(topic); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + std::cout << "Registered and unregistered " << (num_topics * 2) + << " stream handlers in " << duration.count() << "ms" << std::endl; +} + +// Server-dependent stress tests +class RoomServerStressTest : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogSink::kConsole); + + const char *url_env = std::getenv("LIVEKIT_URL"); + const char *token_env = std::getenv("LIVEKIT_TOKEN"); + + if (url_env && token_env) { + server_url_ = url_env; + token_ = token_env; + server_available_ = true; + } + } + + void TearDown() override { livekit::shutdown(); } + + bool server_available_ = false; + std::string server_url_; + std::string token_; +}; + +TEST_F(RoomServerStressTest, RepeatedConnectDisconnect) { + if (!server_available_) { + GTEST_SKIP() + << "LIVEKIT_URL and LIVEKIT_TOKEN not set, skipping server stress test"; + } + + const int num_iterations = 10; + + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < num_iterations; ++i) { + Room room; + RoomOptions options; + + bool connected = room.Connect(server_url_, token_, options); + if (connected) { + ASSERT_NE(room.localParticipant(), nullptr); + } + // Room disconnects when it goes out of scope + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + std::cout << "Completed " << num_iterations + << " connect/disconnect cycles in " << duration.count() << "s" + << std::endl; +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp new file mode 100644 index 0000000..ba7bb6f --- /dev/null +++ b/src/tests/stress/test_rpc_stress.cpp @@ -0,0 +1,847 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +using namespace std::chrono_literals; + +// Maximum RPC payload size (15KB) +constexpr size_t kMaxRpcPayloadSize = 15 * 1024; + +// Default stress test duration in seconds (can be overridden by env var) +constexpr int kDefaultStressDurationSeconds = 3600; // 1 hour + +// Test configuration from environment variables +struct RpcStressTestConfig { + std::string url; + std::string caller_token; + std::string receiver_token; + int duration_seconds; + int num_caller_threads; + bool available = false; + + static RpcStressTestConfig fromEnv() { + RpcStressTestConfig config; + const char *url = std::getenv("LIVEKIT_URL"); + const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *duration_env = std::getenv("RPC_STRESS_DURATION_SECONDS"); + const char *threads_env = std::getenv("RPC_STRESS_CALLER_THREADS"); + + if (url && caller_token && receiver_token) { + config.url = url; + config.caller_token = caller_token; + config.receiver_token = receiver_token; + config.available = true; + } + + config.duration_seconds = + duration_env ? std::atoi(duration_env) : kDefaultStressDurationSeconds; + config.num_caller_threads = threads_env ? std::atoi(threads_env) : 4; + + return config; + } +}; + +// Statistics collector +class StressTestStats { +public: + void recordCall(bool success, double latency_ms, size_t payload_size) { + std::lock_guard lock(mutex_); + total_calls_++; + if (success) { + successful_calls_++; + latencies_.push_back(latency_ms); + total_bytes_ += payload_size; + } else { + failed_calls_++; + } + } + + void recordError(const std::string &error_type) { + std::lock_guard lock(mutex_); + error_counts_[error_type]++; + } + + void printStats() const { + std::lock_guard lock(mutex_); + + std::cout << "\n========================================" << std::endl; + std::cout << " RPC Stress Test Statistics " << std::endl; + std::cout << "========================================" << std::endl; + std::cout << "Total calls: " << total_calls_ << std::endl; + std::cout << "Successful: " << successful_calls_ << std::endl; + std::cout << "Failed: " << failed_calls_ << std::endl; + std::cout << "Success rate: " << std::fixed << std::setprecision(2) + << (total_calls_ > 0 ? (100.0 * successful_calls_ / total_calls_) + : 0.0) + << "%" << std::endl; + std::cout << "Total bytes: " << total_bytes_ << " (" + << (total_bytes_ / (1024.0 * 1024.0)) << " MB)" << std::endl; + + if (!latencies_.empty()) { + std::vector sorted_latencies = latencies_; + std::sort(sorted_latencies.begin(), sorted_latencies.end()); + + double sum = std::accumulate(sorted_latencies.begin(), + sorted_latencies.end(), 0.0); + double avg = sum / sorted_latencies.size(); + double min = sorted_latencies.front(); + double max = sorted_latencies.back(); + double p50 = sorted_latencies[sorted_latencies.size() * 50 / 100]; + double p95 = sorted_latencies[sorted_latencies.size() * 95 / 100]; + double p99 = sorted_latencies[sorted_latencies.size() * 99 / 100]; + + std::cout << "\nLatency (ms):" << std::endl; + std::cout << " Min: " << min << std::endl; + std::cout << " Avg: " << avg << std::endl; + std::cout << " P50: " << p50 << std::endl; + std::cout << " P95: " << p95 << std::endl; + std::cout << " P99: " << p99 << std::endl; + std::cout << " Max: " << max << std::endl; + } + + if (!error_counts_.empty()) { + std::cout << "\nError breakdown:" << std::endl; + for (const auto &pair : error_counts_) { + std::cout << " " << pair.first << ": " << pair.second << std::endl; + } + } + + std::cout << "========================================\n" << std::endl; + } + + int totalCalls() const { + std::lock_guard lock(mutex_); + return total_calls_; + } + + int successfulCalls() const { + std::lock_guard lock(mutex_); + return successful_calls_; + } + + int failedCalls() const { + std::lock_guard lock(mutex_); + return failed_calls_; + } + +private: + mutable std::mutex mutex_; + int total_calls_ = 0; + int successful_calls_ = 0; + int failed_calls_ = 0; + size_t total_bytes_ = 0; + std::vector latencies_; + std::map error_counts_; +}; + +// Generate a random string of specified size +std::string generateRandomPayload(size_t size) { + static const char charset[] = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + static thread_local std::random_device rd; + static thread_local std::mt19937 gen(rd()); + static std::uniform_int_distribution<> dis(0, sizeof(charset) - 2); + + std::string result; + result.reserve(size); + for (size_t i = 0; i < size; ++i) { + result += charset[dis(gen)]; + } + return result; +} + +// Wait for a remote participant to appear +bool waitForParticipant(Room *room, const std::string &identity, + std::chrono::milliseconds timeout) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + if (room->remoteParticipant(identity) != nullptr) { + return true; + } + std::this_thread::sleep_for(100ms); + } + return false; +} + +class RpcStressTest : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogSink::kConsole); + config_ = RpcStressTestConfig::fromEnv(); + } + + void TearDown() override { livekit::shutdown(); } + + RpcStressTestConfig config_; +}; + +// Long-running stress test with max payload sizes +TEST_F(RpcStressTest, MaxPayloadStress) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + std::cout << "\n=== RPC Max Payload Stress Test ===" << std::endl; + std::cout << "Duration: " << config_.duration_seconds << " seconds" + << std::endl; + std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; + std::cout << "Max payload size: " << kMaxRpcPayloadSize << " bytes (15KB)" + << std::endl; + + // Create receiver room + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + + // Register RPC handler that processes max payloads + receiver_room->localParticipant()->registerRpcMethod( + "max-payload-stress", + [&total_received]( + const RpcInvocationData &data) -> std::optional { + total_received++; + // Return checksum of payload for verification + size_t checksum = 0; + for (char c : data.payload) { + checksum += static_cast(c); + } + return std::to_string(data.payload.size()) + ":" + + std::to_string(checksum); + }); + + // Create caller room + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + std::cout << "Both rooms connected. Starting stress test..." << std::endl; + + StressTestStats stats; + std::atomic running{true}; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.duration_seconds); + + // Create caller threads + std::vector caller_threads; + for (int t = 0; t < config_.num_caller_threads; ++t) { + caller_threads.emplace_back([&, thread_id = t]() { + while (running.load()) { + // Always use max payload size (15KB) + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + // Calculate expected checksum + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "max-payload-stress", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + // Verify response + std::string expected_response = std::to_string(kMaxRpcPayloadSize) + + ":" + + std::to_string(expected_checksum); + + if (response == expected_response) { + stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); + stats.recordError("checksum_mismatch"); + } + } catch (const RpcError &e) { + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); + + auto code = static_cast(e.code()); + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { + stats.recordError("timeout"); + } else if (code == RpcError::ErrorCode::CONNECTION_TIMEOUT) { + stats.recordError("connection_timeout"); + } else if (code == RpcError::ErrorCode::RECIPIENT_DISCONNECTED) { + stats.recordError("recipient_disconnected"); + } else { + stats.recordError("rpc_error_" + std::to_string(e.code())); + } + } catch (const std::exception &e) { + stats.recordCall(false, 0, kMaxRpcPayloadSize); + stats.recordError("exception"); + } + + // Small delay between calls to avoid overwhelming + std::this_thread::sleep_for(10ms); + } + }); + } + + // Progress reporting thread + std::thread progress_thread([&]() { + int last_total = 0; + while (running.load()) { + std::this_thread::sleep_for(30s); + if (!running.load()) + break; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + int current_total = stats.totalCalls(); + int calls_per_30s = current_total - last_total; + last_total = current_total; + + std::cout << "[" << elapsed_seconds << "s] Total: " << current_total + << " | Success: " << stats.successfulCalls() + << " | Failed: " << stats.failedCalls() + << " | Rate: " << (calls_per_30s / 30.0) << " calls/sec" + << " | Received: " << total_received.load() << std::endl; + } + }); + + // Wait for test duration + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + std::cout << "\nStopping stress test..." << std::endl; + running.store(false); + + // Wait for all threads + for (auto &t : caller_threads) { + t.join(); + } + progress_thread.join(); + + stats.printStats(); + + // Verify results + EXPECT_GT(stats.successfulCalls(), 0) << "No successful calls"; + double success_rate = + (stats.totalCalls() > 0) + ? (100.0 * stats.successfulCalls() / stats.totalCalls()) + : 0.0; + EXPECT_GT(success_rate, 95.0) << "Success rate below 95%"; + + receiver_room->localParticipant()->unregisterRpcMethod("max-payload-stress"); + caller_room.reset(); + receiver_room.reset(); +} + +// Stress test with varying payload sizes +TEST_F(RpcStressTest, VaryingPayloadStress) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + std::cout << "\n=== RPC Varying Payload Stress Test ===" << std::endl; + std::cout << "Duration: " << config_.duration_seconds << " seconds" + << std::endl; + std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + std::map> received_by_size; + std::mutex size_map_mutex; + + receiver_room->localParticipant()->registerRpcMethod( + "varying-payload-stress", + [&](const RpcInvocationData &data) -> std::optional { + total_received++; + size_t size = data.payload.size(); + + { + std::lock_guard lock(size_map_mutex); + received_by_size[size]++; + } + + return std::to_string(size); + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + StressTestStats stats; + std::atomic running{true}; + + // Payload sizes to test + std::vector payload_sizes = { + 100, // Small + 1024, // 1KB + 5 * 1024, // 5KB + 10 * 1024, // 10KB + kMaxRpcPayloadSize - 1, // Just under max + kMaxRpcPayloadSize // Max (15KB) + }; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.duration_seconds); + + std::vector caller_threads; + for (int t = 0; t < config_.num_caller_threads; ++t) { + caller_threads.emplace_back([&, thread_id = t]() { + int call_count = 0; + while (running.load()) { + size_t payload_size = payload_sizes[call_count % payload_sizes.size()]; + std::string payload = generateRandomPayload(payload_size); + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "varying-payload-stress", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + if (response == std::to_string(payload_size)) { + stats.recordCall(true, latency_ms, payload_size); + } else { + stats.recordCall(false, latency_ms, payload_size); + stats.recordError("size_mismatch"); + } + } catch (const RpcError &e) { + stats.recordCall(false, 0, payload_size); + auto code = static_cast(e.code()); + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { + stats.recordError("timeout"); + } else { + stats.recordError("rpc_error"); + } + } catch (const std::exception &) { + stats.recordCall(false, 0, payload_size); + stats.recordError("exception"); + } + + call_count++; + std::this_thread::sleep_for(5ms); + } + }); + } + + // Progress reporting + std::thread progress_thread([&]() { + while (running.load()) { + std::this_thread::sleep_for(30s); + if (!running.load()) + break; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + + std::cout << "[" << elapsed_seconds << "s] Total: " << stats.totalCalls() + << " | Success: " << stats.successfulCalls() + << " | Failed: " << stats.failedCalls() << std::endl; + } + }); + + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + running.store(false); + + for (auto &t : caller_threads) { + t.join(); + } + progress_thread.join(); + + stats.printStats(); + + // Print breakdown by size + std::cout << "Received by payload size:" << std::endl; + { + std::lock_guard lock(size_map_mutex); + for (const auto &pair : received_by_size) { + std::cout << " " << pair.first << " bytes: " << pair.second.load() + << std::endl; + } + } + + EXPECT_GT(stats.successfulCalls(), 0); + double success_rate = + (stats.totalCalls() > 0) + ? (100.0 * stats.successfulCalls() / stats.totalCalls()) + : 0.0; + EXPECT_GT(success_rate, 95.0) << "Success rate below 95%"; + + receiver_room->localParticipant()->unregisterRpcMethod( + "varying-payload-stress"); + caller_room.reset(); + receiver_room.reset(); +} + +// Stress test for bidirectional RPC (both sides can call each other) +TEST_F(RpcStressTest, BidirectionalRpcStress) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + std::cout << "\n=== Bidirectional RPC Stress Test ===" << std::endl; + std::cout << "Duration: " << config_.duration_seconds << " seconds" + << std::endl; + + auto room_a = std::make_unique(); + auto room_b = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool a_connected = + room_a->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(a_connected) << "Room A failed to connect"; + + bool b_connected = + room_b->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(b_connected) << "Room B failed to connect"; + + std::string identity_a = room_a->localParticipant()->identity(); + std::string identity_b = room_b->localParticipant()->identity(); + + ASSERT_TRUE(waitForParticipant(room_a.get(), identity_b, 10s)) + << "Room B not visible to Room A"; + ASSERT_TRUE(waitForParticipant(room_b.get(), identity_a, 10s)) + << "Room A not visible to Room B"; + + std::atomic a_received{0}; + std::atomic b_received{0}; + + // Register handlers on both sides + room_a->localParticipant()->registerRpcMethod( + "ping", + [&a_received]( + const RpcInvocationData &data) -> std::optional { + a_received++; + return "pong:" + data.payload; + }); + + room_b->localParticipant()->registerRpcMethod( + "ping", + [&b_received]( + const RpcInvocationData &data) -> std::optional { + b_received++; + return "pong:" + data.payload; + }); + + StressTestStats stats_a_to_b; + StressTestStats stats_b_to_a; + std::atomic running{true}; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.duration_seconds); + + // A calling B + std::thread thread_a_to_b([&]() { + int counter = 0; + while (running.load()) { + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = room_a->localParticipant()->performRpc( + identity_b, "ping", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + if (response == "pong:" + payload) { + stats_a_to_b.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats_a_to_b.recordCall(false, latency_ms, kMaxRpcPayloadSize); + } + } catch (...) { + stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); + } + + counter++; + std::this_thread::sleep_for(20ms); + } + }); + + // B calling A + std::thread thread_b_to_a([&]() { + int counter = 0; + while (running.load()) { + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = room_b->localParticipant()->performRpc( + identity_a, "ping", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + if (response == "pong:" + payload) { + stats_b_to_a.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats_b_to_a.recordCall(false, latency_ms, kMaxRpcPayloadSize); + } + } catch (...) { + stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); + } + + counter++; + std::this_thread::sleep_for(20ms); + } + }); + + // Progress + std::thread progress_thread([&]() { + while (running.load()) { + std::this_thread::sleep_for(30s); + if (!running.load()) + break; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + + std::cout << "[" << elapsed_seconds << "s] " + << "A->B: " << stats_a_to_b.successfulCalls() << "/" + << stats_a_to_b.totalCalls() << " | " + << "B->A: " << stats_b_to_a.successfulCalls() << "/" + << stats_b_to_a.totalCalls() << " | " + << "A rcvd: " << a_received.load() + << " | B rcvd: " << b_received.load() << std::endl; + } + }); + + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + running.store(false); + + thread_a_to_b.join(); + thread_b_to_a.join(); + progress_thread.join(); + + std::cout << "\n=== A -> B Statistics ===" << std::endl; + stats_a_to_b.printStats(); + + std::cout << "\n=== B -> A Statistics ===" << std::endl; + stats_b_to_a.printStats(); + + EXPECT_GT(stats_a_to_b.successfulCalls(), 0); + EXPECT_GT(stats_b_to_a.successfulCalls(), 0); + + room_a->localParticipant()->unregisterRpcMethod("ping"); + room_b->localParticipant()->unregisterRpcMethod("ping"); + room_a.reset(); + room_b.reset(); +} + +// High throughput stress test (short bursts) +TEST_F(RpcStressTest, HighThroughputBurst) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + std::cout << "\n=== High Throughput Burst Test ===" << std::endl; + std::cout << "Duration: " << config_.duration_seconds << " seconds" + << std::endl; + std::cout << "Testing rapid-fire RPC with max payload (15KB)..." << std::endl; + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + + receiver_room->localParticipant()->registerRpcMethod( + "burst-test", + [&total_received]( + const RpcInvocationData &data) -> std::optional { + total_received++; + return std::to_string(data.payload.size()); + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + StressTestStats stats; + std::atomic running{true}; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.duration_seconds); + + // Multiple threads sending as fast as possible + std::vector burst_threads; + for (int t = 0; t < config_.num_caller_threads * 2; ++t) { + burst_threads.emplace_back([&]() { + while (running.load()) { + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "burst-test", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } catch (...) { + stats.recordCall(false, 0, kMaxRpcPayloadSize); + } + + // No delay - burst mode + } + }); + } + + // Progress + std::thread progress_thread([&]() { + int last_total = 0; + while (running.load()) { + std::this_thread::sleep_for(10s); + if (!running.load()) + break; + + int current = stats.totalCalls(); + double rate = (current - last_total) / 10.0; + last_total = current; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + + std::cout << "[" << elapsed_seconds << "s] " + << "Total: " << current + << " | Success: " << stats.successfulCalls() + << " | Rate: " << rate << " calls/sec" + << " | Throughput: " << (rate * kMaxRpcPayloadSize / 1024.0) + << " KB/sec" << std::endl; + } + }); + + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + running.store(false); + + for (auto &t : burst_threads) { + t.join(); + } + progress_thread.join(); + + stats.printStats(); + + auto total_time = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time) + .count(); + double avg_rate = static_cast(stats.totalCalls()) / total_time; + double throughput_kbps = + (static_cast(stats.successfulCalls()) * kMaxRpcPayloadSize) / + (total_time * 1024.0); + + std::cout << "Average rate: " << avg_rate << " calls/sec" << std::endl; + std::cout << "Average throughput: " << throughput_kbps << " KB/sec" + << std::endl; + + EXPECT_GT(stats.successfulCalls(), 0); + + receiver_room->localParticipant()->unregisterRpcMethod("burst-test"); + caller_room.reset(); + receiver_room.reset(); +} + +} // namespace test +} // namespace livekit From 87f66faac40a26283d89e1b028f1134479af4259 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Tue, 3 Feb 2026 12:26:24 -0800 Subject: [PATCH 2/3] change the tests --- src/tests/integration/test_rpc.cpp | 31 ++++++++++++++----- src/tests/stress/test_rpc_stress.cpp | 45 +++++++++++++++++++++++----- 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/src/tests/integration/test_rpc.cpp b/src/tests/integration/test_rpc.cpp index 547f1da..4e341fc 100644 --- a/src/tests/integration/test_rpc.cpp +++ b/src/tests/integration/test_rpc.cpp @@ -57,20 +57,37 @@ struct RpcTestConfig { } }; -// Generate a random string of specified size +// Sample sentences for generating compressible payloads +static const std::vector kSampleSentences = { + "The quick brown fox jumps over the lazy dog. ", + "LiveKit is a real-time communication platform for building video and " + "audio applications. ", + "RPC allows participants to call methods on remote peers with " + "request-response semantics. ", + "This test measures the performance and reliability of the RPC system. ", + "WebRTC enables peer-to-peer communication for real-time media streaming. ", +}; + +// Generate a payload of specified size using repeating sentences (compressible) std::string generateRandomPayload(size_t size) { - static const char charset[] = - "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; static std::random_device rd; static std::mt19937 gen(rd()); - static std::uniform_int_distribution<> dis(0, sizeof(charset) - 2); + static std::uniform_int_distribution dis(0, + kSampleSentences.size() - 1); std::string result; result.reserve(size); - for (size_t i = 0; i < size; ++i) { - result += charset[dis(gen)]; + + // Start with a random sentence to add some variation between payloads + size_t start_idx = dis(gen); + + while (result.size() < size) { + const std::string &sentence = + kSampleSentences[(start_idx + result.size()) % kSampleSentences.size()]; + result += sentence; } - return result; + + return result.substr(0, size); } // Wait for a remote participant to appear diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index ba7bb6f..7902b1a 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -40,7 +40,7 @@ using namespace std::chrono_literals; constexpr size_t kMaxRpcPayloadSize = 15 * 1024; // Default stress test duration in seconds (can be overridden by env var) -constexpr int kDefaultStressDurationSeconds = 3600; // 1 hour +constexpr int kDefaultStressDurationSeconds = 600; // 10mins // Test configuration from environment variables struct RpcStressTestConfig { @@ -167,20 +167,49 @@ class StressTestStats { std::map error_counts_; }; -// Generate a random string of specified size +// Sample sentences for generating compressible payloads +static const std::vector kSampleSentences = { + "The quick brown fox jumps over the lazy dog. ", + "LiveKit is a real-time communication platform for building video and " + "audio applications. ", + "RPC allows participants to call methods on remote peers with " + "request-response semantics. ", + "This stress test measures the performance and reliability of the RPC " + "system under load. ", + "WebRTC enables peer-to-peer communication for real-time media streaming. ", + "The payload is compressed using Zstd to reduce bandwidth and improve " + "throughput. ", + "Data channels provide reliable or unreliable delivery of arbitrary " + "application data. ", + "Participants can publish audio and video tracks to share media with " + "others in the room. ", + "The signaling server coordinates connection establishment between peers. ", + "End-to-end encryption ensures that media content is only accessible to " + "participants. ", +}; + +// Generate a payload of specified size using repeating sentences (compressible) std::string generateRandomPayload(size_t size) { - static const char charset[] = - "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; static thread_local std::random_device rd; static thread_local std::mt19937 gen(rd()); - static std::uniform_int_distribution<> dis(0, sizeof(charset) - 2); + static std::uniform_int_distribution dis(0, + kSampleSentences.size() - 1); std::string result; result.reserve(size); - for (size_t i = 0; i < size; ++i) { - result += charset[dis(gen)]; + + // Start with a random sentence to add some variation between payloads + size_t start_idx = dis(gen); + + while (result.size() < size) { + // Cycle through sentences starting from a random position + const std::string &sentence = + kSampleSentences[(start_idx + result.size()) % kSampleSentences.size()]; + result += sentence; } - return result; + + // Trim to exact size + return result.substr(0, size); } // Wait for a remote participant to appear From 923b05c1ac4c8b17b7bed70c63c2217b1decdc0a Mon Sep 17 00:00:00 2001 From: shijing xian Date: Tue, 3 Feb 2026 17:40:27 -0800 Subject: [PATCH 3/3] get integration passed --- include/livekit/local_participant.h | 22 +++++++++ src/local_participant.cpp | 68 +++++++++++++++++++++++++++ src/room.cpp | 13 +++++ src/tests/integration/test_room.cpp | 2 +- src/tests/integration/test_rpc.cpp | 23 +++++++-- src/tests/stress/test_room_stress.cpp | 2 +- src/tests/stress/test_rpc_stress.cpp | 40 ++++++++++++++-- 7 files changed, 159 insertions(+), 11 deletions(-) diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 0a7352a..a624238 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -21,9 +21,12 @@ #include "livekit/room_event_types.h" #include "livekit/rpc_error.h" +#include +#include #include #include #include +#include #include #include #include @@ -185,6 +188,13 @@ class LocalParticipant : public Participant { void unregisterRpcMethod(const std::string &method_name); protected: + /** + * Shutdown the local participant, cleaning up all resources. + * + * This unregisters all RPC handlers and prepares the participant for + * destruction. Called by Room during its destruction sequence. + */ + void shutdown(); // Called by Room when an rpc_method_invocation event is received from the // SFU. This is internal plumbing and not intended to be called directly by // SDK users. @@ -202,6 +212,18 @@ class LocalParticipant : public Participant { private: PublicationMap track_publications_; std::unordered_map rpc_handlers_; + + // Shared state for RPC invocation tracking. Using shared_ptr so the state + // can outlive the LocalParticipant if there are in-flight invocations when + // the participant is destroyed. + struct RpcInvocationState { + std::mutex mutex; + std::condition_variable cv; + int active_invocations = 0; + bool shutting_down = false; + }; + std::shared_ptr rpc_state_ = + std::make_shared(); }; } // namespace livekit diff --git a/src/local_participant.cpp b/src/local_participant.cpp index 0ffc472..aa1b869 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -29,6 +29,7 @@ #include "track.pb.h" #include "track_proto_converter.h" +#include #include namespace livekit { @@ -276,10 +277,68 @@ void LocalParticipant::unregisterRpcMethod(const std::string &method_name) { (void)FfiClient::instance().sendRequest(req); } +void LocalParticipant::shutdown() { + // Mark as shutting down and wait for all active invocations to complete + { + std::unique_lock lock(rpc_state_->mutex); + rpc_state_->shutting_down = true; + // Wait up to 5 seconds for active RPC invocations to complete. + // If timeout expires, proceed anyway - late responses will fail but + // at least we won't block shutdown indefinitely. + rpc_state_->cv.wait_for(lock, std::chrono::seconds(5), [this] { + return rpc_state_->active_invocations == 0; + }); + } + + auto handle_id = ffiHandleId(); + // If handle is invalid, just clear local handlers - FFI cleanup not possible + if (handle_id == 0) { + rpc_handlers_.clear(); + return; + } + + // Unregister all RPC methods with FFI and clear local handlers + for (const auto &pair : rpc_handlers_) { + FfiRequest req; + auto *msg = req.mutable_unregister_rpc_method(); + msg->set_local_participant_handle(static_cast(handle_id)); + msg->set_method(pair.first); + (void)FfiClient::instance().sendRequest(req); + } + rpc_handlers_.clear(); +} + void LocalParticipant::handleRpcMethodInvocation( uint64_t invocation_id, const std::string &method, const std::string &request_id, const std::string &caller_identity, const std::string &payload, double response_timeout_sec) { + // Capture shared state so it outlives LocalParticipant if needed + auto state = rpc_state_; + + // Track this invocation and check if we're shutting down + { + std::lock_guard lock(state->mutex); + if (state->shutting_down) { + // Already shutting down, don't process new invocations + return; + } + state->active_invocations++; + } + + // RAII guard to decrement counter and notify on exit. + // Captures shared_ptr to state so mutex stays valid even if + // LocalParticipant is destroyed during handler execution. + struct InvocationGuard { + std::shared_ptr state; + ~InvocationGuard() { + std::lock_guard lock(state->mutex); + state->active_invocations--; + if (state->active_invocations == 0) { + state->cv.notify_all(); + } + } + } guard{state}; + std::optional response_error; std::optional response_payload; RpcInvocationData params{request_id, caller_identity, payload, @@ -305,6 +364,15 @@ void LocalParticipant::handleRpcMethodInvocation( } } + // Check again if shutdown started during handler execution + { + std::lock_guard lock(state->mutex); + if (state->shutting_down) { + // Shutdown started, don't send response - handle may be invalid + return; + } + } + FfiRequest req; auto *msg = req.mutable_rpc_method_invocation_response(); msg->set_local_participant_handle(ffiHandleId()); diff --git a/src/room.cpp b/src/room.cpp index 174b64b..f2a7176 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -69,14 +69,27 @@ Room::Room() {} Room::~Room() { int listener_to_remove = 0; + std::unique_ptr local_participant_to_cleanup; { std::lock_guard g(lock_); listener_to_remove = listener_id_; listener_id_ = 0; + // Move local participant out for cleanup outside the lock + local_participant_to_cleanup = std::move(local_participant_); } + + // Shutdown local participant (unregisters RPC handlers, etc.) before + // removing the listener. This prevents in-flight RPC responses from + // trying to use destroyed handles. + if (local_participant_to_cleanup) { + local_participant_to_cleanup->shutdown(); + } + if (listener_to_remove != 0) { FfiClient::instance().RemoveListener(listener_to_remove); } + + // local_participant_to_cleanup is destroyed here after listener is removed } void Room::setDelegate(RoomDelegate *delegate) { diff --git a/src/tests/integration/test_room.cpp b/src/tests/integration/test_room.cpp index fca83a0..2cf1ae0 100644 --- a/src/tests/integration/test_room.cpp +++ b/src/tests/integration/test_room.cpp @@ -104,7 +104,7 @@ class RoomServerTest : public ::testing::Test { livekit::initialize(livekit::LogSink::kConsole); const char *url_env = std::getenv("LIVEKIT_URL"); - const char *token_env = std::getenv("LIVEKIT_TOKEN"); + const char *token_env = std::getenv("LIVEKIT_CALLER_TOKEN"); if (url_env && token_env) { server_url_ = url_env; diff --git a/src/tests/integration/test_rpc.cpp b/src/tests/integration/test_rpc.cpp index 4e341fc..fd89a8b 100644 --- a/src/tests/integration/test_rpc.cpp +++ b/src/tests/integration/test_rpc.cpp @@ -133,14 +133,20 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { std::string receiver_identity = receiver_room->localParticipant()->identity(); - // Register RPC handler on receiver + // Register RPC handler on receiver - returns size and checksum instead of + // full payload std::atomic rpc_calls_received{0}; receiver_room->localParticipant()->registerRpcMethod( "echo", [&rpc_calls_received]( const RpcInvocationData &data) -> std::optional { rpc_calls_received++; - return "echo: " + data.payload; + size_t checksum = 0; + for (char c : data.payload) { + checksum += static_cast(c); + } + return "echo:" + std::to_string(data.payload.size()) + ":" + + std::to_string(checksum); }); // Create caller room @@ -158,10 +164,19 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; // Perform RPC call + std::string test_payload = "hello world"; std::string response = caller_room->localParticipant()->performRpc( - receiver_identity, "echo", "hello world", 10.0); + receiver_identity, "echo", test_payload, 10.0); - EXPECT_EQ(response, "echo: hello world"); + // Verify response contains correct size and checksum + size_t expected_checksum = 0; + for (char c : test_payload) { + expected_checksum += static_cast(c); + } + std::string expected_response = + "echo:" + std::to_string(test_payload.size()) + ":" + + std::to_string(expected_checksum); + EXPECT_EQ(response, expected_response); EXPECT_EQ(rpc_calls_received.load(), 1); // Cleanup diff --git a/src/tests/stress/test_room_stress.cpp b/src/tests/stress/test_room_stress.cpp index f7c791c..e913b5b 100644 --- a/src/tests/stress/test_room_stress.cpp +++ b/src/tests/stress/test_room_stress.cpp @@ -191,7 +191,7 @@ class RoomServerStressTest : public ::testing::Test { livekit::initialize(livekit::LogSink::kConsole); const char *url_env = std::getenv("LIVEKIT_URL"); - const char *token_env = std::getenv("LIVEKIT_TOKEN"); + const char *token_env = std::getenv("LIVEKIT_CALLER_TOKEN"); if (url_env && token_env) { server_url_ = url_env; diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index 7902b1a..b941e42 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -40,7 +40,7 @@ using namespace std::chrono_literals; constexpr size_t kMaxRpcPayloadSize = 15 * 1024; // Default stress test duration in seconds (can be overridden by env var) -constexpr int kDefaultStressDurationSeconds = 600; // 10mins +constexpr int kDefaultStressDurationSeconds = 3600; // 1 hour // Test configuration from environment variables struct RpcStressTestConfig { @@ -611,12 +611,19 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { std::atomic b_received{0}; // Register handlers on both sides + // Return checksum instead of full payload to avoid exceeding 15KB response + // limit room_a->localParticipant()->registerRpcMethod( "ping", [&a_received]( const RpcInvocationData &data) -> std::optional { a_received++; - return "pong:" + data.payload; + size_t checksum = 0; + for (char c : data.payload) { + checksum += static_cast(c); + } + return "pong:" + std::to_string(data.payload.size()) + ":" + + std::to_string(checksum); }); room_b->localParticipant()->registerRpcMethod( @@ -624,7 +631,12 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { [&b_received]( const RpcInvocationData &data) -> std::optional { b_received++; - return "pong:" + data.payload; + size_t checksum = 0; + for (char c : data.payload) { + checksum += static_cast(c); + } + return "pong:" + std::to_string(data.payload.size()) + ":" + + std::to_string(checksum); }); StressTestStats stats_a_to_b; @@ -639,6 +651,15 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { int counter = 0; while (running.load()) { std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + // Calculate expected checksum for verification + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + std::string expected_response = "pong:" + std::to_string(payload.size()) + + ":" + std::to_string(expected_checksum); + auto call_start = std::chrono::high_resolution_clock::now(); try { @@ -650,7 +671,7 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { std::chrono::duration(call_end - call_start) .count(); - if (response == "pong:" + payload) { + if (response == expected_response) { stats_a_to_b.recordCall(true, latency_ms, kMaxRpcPayloadSize); } else { stats_a_to_b.recordCall(false, latency_ms, kMaxRpcPayloadSize); @@ -669,6 +690,15 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { int counter = 0; while (running.load()) { std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + // Calculate expected checksum for verification + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + std::string expected_response = "pong:" + std::to_string(payload.size()) + + ":" + std::to_string(expected_checksum); + auto call_start = std::chrono::high_resolution_clock::now(); try { @@ -680,7 +710,7 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { std::chrono::duration(call_end - call_start) .count(); - if (response == "pong:" + payload) { + if (response == expected_response) { stats_b_to_a.recordCall(true, latency_ms, kMaxRpcPayloadSize); } else { stats_b_to_a.recordCall(false, latency_ms, kMaxRpcPayloadSize);