diff --git a/docs/api/rest.rst b/docs/api/rest.rst index 14e76d319..851037e43 100644 --- a/docs/api/rest.rst +++ b/docs/api/rest.rst @@ -169,12 +169,75 @@ Functions ``GET /api/v1/functions/{function_id}/hosts`` List apps that host this function. +``GET /api/v1/functions/{function_id}/x-medkit-graph`` + Get a function-scoped topology snapshot with per-topic metrics and pipeline status. + + **Example Response:** + + .. code-block:: json + + { + "x-medkit-graph": { + "schema_version": "1.0.0", + "graph_id": "perception_graph-graph", + "timestamp": "2026-03-08T12:00:00.000Z", + "scope": { + "type": "function", + "entity_id": "perception_graph" + }, + "pipeline_status": "degraded", + "bottleneck_edge": "edge-2", + "topics": [ + { + "topic_id": "topic-1", + "name": "/camera/front/image_raw" + } + ], + "nodes": [ + { + "entity_id": "camera_front", + "node_status": "reachable" + }, + { + "entity_id": "detector", + "node_status": "unreachable", + "last_seen": "2026-03-08T11:59:42.100Z" + } + ], + "edges": [ + { + "edge_id": "edge-2", + "source": "camera_front", + "target": "detector", + "topic_id": "topic-1", + "transport_type": "unknown", + "metrics": { + "source": "greenwave_monitor", + "frequency_hz": 12.5, + "latency_ms": 4.2, + "drop_rate_percent": 0.0, + "metrics_status": "active" + } + } + ] + } + } + + **Field Notes:** + + - ``pipeline_status``: overall graph state, one of ``healthy``, ``degraded``, ``broken`` + - ``node_status``: per-node reachability, one of ``reachable``, ``unreachable`` + - ``metrics_status``: per-edge telemetry state, one of ``pending``, ``active``, ``error`` + - ``error_reason``: present when ``metrics_status`` is ``error``; one of ``node_offline``, ``topic_stale``, ``no_data_source`` + .. note:: **ros2_medkit extension:** Functions support resource collections beyond the SOVD spec. ``/data`` and ``/operations`` aggregate from hosted apps (per SOVD). Additionally, - ``/configurations``, ``/faults``, ``/logs`` aggregate from hosts, and read-only - ``/bulk-data`` is available. See :ref:`sovd-compliance` for details. + ``/configurations``, ``/faults``, ``/logs`` aggregate from hosts, read-only + ``/bulk-data`` is available, ``/cyclic-subscriptions`` is supported, and + the vendor resource ``/x-medkit-graph`` exposes a function-scoped graph snapshot. + See :ref:`sovd-compliance` for details. Data Endpoints -------------- @@ -1005,7 +1068,7 @@ Subscriptions are temporary - they do not survive server restart. - ``faults`` - Fault list (resource path optional, e.g. ``/faults`` or ``/faults/fault_001``) - ``configurations`` - Parameter values (resource path optional) - ``logs`` - Application log entries from ``/rosout`` -- ``x-*`` - Vendor extensions (e.g. ``x-medkit-metrics``) +- ``x-*`` - Vendor extensions (e.g. ``x-medkit-graph``) **Interval values:** @@ -1016,7 +1079,7 @@ Subscriptions are temporary - they do not survive server restart. ``POST /api/v1/{entity_type}/{entity_id}/cyclic-subscriptions`` Create a new cyclic subscription. - **Applies to:** ``/apps``, ``/components`` + **Applies to:** ``/apps``, ``/components``, ``/functions`` **Request Body:** @@ -1032,7 +1095,8 @@ Subscriptions are temporary - they do not survive server restart. **Fields:** - ``resource`` (string, required): Full SOVD resource URI to observe - (e.g. ``/api/v1/apps/{id}/data/{topic}``, ``/api/v1/apps/{id}/faults``) + (e.g. ``/api/v1/apps/{id}/data/{topic}``, ``/api/v1/apps/{id}/faults``, + ``/api/v1/functions/{id}/x-medkit-graph``) - ``protocol`` (string, optional): Transport protocol. Only ``"sse"`` supported. Default: ``"sse"`` - ``interval`` (string, required): One of ``fast``, ``normal``, ``slow`` - ``duration`` (integer, required): Subscription lifetime in seconds. @@ -1371,7 +1435,7 @@ extends this to areas and functions where aggregation makes practical sense: - \- - yes - yes - - \- + - yes - apps, components Other extensions beyond SOVD: diff --git a/src/ros2_medkit_gateway/CMakeLists.txt b/src/ros2_medkit_gateway/CMakeLists.txt index c736a3c14..df5faae2a 100644 --- a/src/ros2_medkit_gateway/CMakeLists.txt +++ b/src/ros2_medkit_gateway/CMakeLists.txt @@ -26,6 +26,7 @@ endif() # Find dependencies find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) +find_package(diagnostic_msgs REQUIRED) find_package(std_msgs REQUIRED) find_package(std_srvs REQUIRED) find_package(sensor_msgs REQUIRED) @@ -147,10 +148,12 @@ add_library(gateway_lib STATIC src/plugins/plugin_context.cpp src/plugins/plugin_loader.cpp src/plugins/plugin_manager.cpp + src/plugins/graph_provider_plugin.cpp ) medkit_target_dependencies(gateway_lib rclcpp + diagnostic_msgs std_msgs std_srvs rcl_interfaces @@ -337,6 +340,11 @@ if(BUILD_TESTING) ament_add_gtest(test_merge_pipeline test/test_merge_pipeline.cpp) target_link_libraries(test_merge_pipeline gateway_lib) + # Add graph provider plugin tests + ament_add_gtest(test_graph_provider_plugin test/test_graph_provider_plugin.cpp) + target_link_libraries(test_graph_provider_plugin gateway_lib) + set_tests_properties(test_graph_provider_plugin PROPERTIES ENVIRONMENT "ROS_DOMAIN_ID=70") + # Add capability builder tests ament_add_gtest(test_capability_builder test/test_capability_builder.cpp) target_link_libraries(test_capability_builder gateway_lib) @@ -516,6 +524,7 @@ if(BUILD_TESTING) test_log_manager test_log_handlers test_merge_pipeline + test_graph_provider_plugin test_runtime_linker ) foreach(_target ${_test_targets}) diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/graph_provider_plugin.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/graph_provider_plugin.hpp new file mode 100644 index 000000000..c3ce80355 --- /dev/null +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/graph_provider_plugin.hpp @@ -0,0 +1,112 @@ +// Copyright 2026 eclipse0922 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ros2_medkit_gateway/plugins/gateway_plugin.hpp" +#include "ros2_medkit_gateway/providers/introspection_provider.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace ros2_medkit_gateway { + +class PluginContext; +class GatewayNode; + +class GraphProviderPlugin : public GatewayPlugin, public IntrospectionProvider { + public: + struct TopicMetrics { + std::optional frequency_hz; + std::optional latency_ms; + double drop_rate_percent{0.0}; + std::optional expected_frequency_hz; + }; + + struct GraphBuildState { + std::unordered_map topic_metrics; + std::unordered_set stale_topics; + std::unordered_map last_seen_by_app; + bool diagnostics_seen{false}; + }; + + struct GraphBuildConfig { + double expected_frequency_hz_default{30.0}; + double degraded_frequency_ratio{0.5}; + double drop_rate_percent_threshold{5.0}; + }; + + GraphProviderPlugin() = default; + ~GraphProviderPlugin() override = default; + + std::string name() const override; + void configure(const nlohmann::json & config) override; + void set_context(PluginContext & context) override; + void register_routes(httplib::Server & server, const std::string & api_prefix) override; + IntrospectionResult introspect(const IntrospectionInput & input) override; + + static nlohmann::json build_graph_document(const std::string & function_id, const IntrospectionInput & input, + const GraphBuildState & state, const GraphBuildConfig & config, + const std::string & timestamp); + + private: + struct ConfigOverrides { + std::unordered_map by_function; + GraphBuildConfig defaults; + }; + + void subscribe_to_diagnostics(); + void diagnostics_callback(const diagnostic_msgs::msg::DiagnosticArray::ConstSharedPtr & msg); + static std::optional parse_topic_metrics(const diagnostic_msgs::msg::DiagnosticStatus & status); + static std::optional parse_double(const std::string & value); + static std::string generate_fault_code(const std::string & diagnostic_name); + static std::string current_timestamp(); + GraphBuildConfig resolve_config(const std::string & function_id) const; + std::optional get_cached_or_built_graph(const std::string & function_id); + std::optional build_graph_from_entity_cache(const std::string & function_id); + std::unordered_set collect_stale_topics(const std::string & function_id, + const IntrospectionInput & input) const; + GraphBuildState build_state_snapshot(const std::string & function_id, const IntrospectionInput & input, + const std::string & timestamp, bool include_stale_topics = true); + void load_parameters(); + + PluginContext * ctx_{nullptr}; + GatewayNode * gateway_node_{nullptr}; + + // Each mutex protects an independent cache/state bucket; no code path acquires more than one. + mutable std::mutex cache_mutex_; + std::unordered_map graph_cache_; + + mutable std::mutex metrics_mutex_; + std::unordered_map topic_metrics_; + std::deque topic_metrics_order_; + bool diagnostics_seen_{false}; // Guarded by metrics_mutex_. + + mutable std::mutex status_mutex_; + std::unordered_map last_seen_by_app_; + + mutable std::mutex config_mutex_; + ConfigOverrides config_; + + rclcpp::Subscription::SharedPtr diagnostics_sub_; +}; + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/package.xml b/src/ros2_medkit_gateway/package.xml index 234825c94..51a5a4efe 100644 --- a/src/ros2_medkit_gateway/package.xml +++ b/src/ros2_medkit_gateway/package.xml @@ -11,6 +11,7 @@ ament_cmake rclcpp + diagnostic_msgs std_msgs std_srvs sensor_msgs diff --git a/src/ros2_medkit_gateway/src/gateway_node.cpp b/src/ros2_medkit_gateway/src/gateway_node.cpp index 1068b22c7..131f119b4 100644 --- a/src/ros2_medkit_gateway/src/gateway_node.cpp +++ b/src/ros2_medkit_gateway/src/gateway_node.cpp @@ -22,6 +22,7 @@ #include #include "ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp" +#include "ros2_medkit_gateway/plugins/graph_provider_plugin.hpp" using namespace std::chrono_literals; @@ -487,14 +488,15 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") { // Initialize plugin manager plugin_mgr_ = std::make_unique(); plugin_mgr_->set_registries(*sampler_registry_, *transport_registry_); + plugin_mgr_->add_plugin(std::make_unique()); auto plugin_names = get_parameter("plugins").as_string_array(); plugin_names.erase(std::remove_if(plugin_names.begin(), plugin_names.end(), [](const auto & item) { return item.empty(); }), plugin_names.end()); + std::vector configs; if (!plugin_names.empty()) { - std::vector configs; // Plugin name validation: alphanumeric, underscore, hyphen only (max 256 chars) auto is_valid_plugin_name = [](const std::string & name) -> bool { if (name.empty() || name.size() > 256) { @@ -529,21 +531,22 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") { } configs.push_back({pname, path, std::move(plugin_config)}); } - auto loaded = plugin_mgr_->load_plugins(configs); - plugin_mgr_->configure_plugins(); - plugin_ctx_ = make_gateway_plugin_context(this, fault_mgr_.get()); - plugin_mgr_->set_context(*plugin_ctx_); - RCLCPP_INFO(get_logger(), "Loaded %zu plugin(s)", loaded); - - // Register IntrospectionProvider plugins as pipeline layers (hybrid mode only) - if (discovery_mgr_->get_mode() == DiscoveryMode::HYBRID) { - auto providers = plugin_mgr_->get_named_introspection_providers(); - for (auto & [name, provider] : providers) { - discovery_mgr_->add_plugin_layer(name, provider); - } - if (!providers.empty()) { - discovery_mgr_->refresh_pipeline(); - } + } + + auto loaded = plugin_mgr_->load_plugins(configs); + plugin_mgr_->configure_plugins(); + plugin_ctx_ = make_gateway_plugin_context(this, fault_mgr_.get()); + plugin_mgr_->set_context(*plugin_ctx_); + RCLCPP_INFO(get_logger(), "Loaded %zu external plugin(s) and 1 built-in plugin", loaded); + + // Register IntrospectionProvider plugins as pipeline layers (hybrid mode only) + if (discovery_mgr_->get_mode() == DiscoveryMode::HYBRID) { + auto providers = plugin_mgr_->get_named_introspection_providers(); + for (auto & [name, provider] : providers) { + discovery_mgr_->add_plugin_layer(name, provider); + } + if (!providers.empty()) { + discovery_mgr_->refresh_pipeline(); } } diff --git a/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp index 69609bfc1..4149fda16 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp @@ -407,6 +407,8 @@ std::string CyclicSubscriptionHandlers::extract_entity_type(const httplib::Reque return "apps"; case SovdEntityType::COMPONENT: return "components"; + case SovdEntityType::FUNCTION: + return "functions"; default: RCLCPP_WARN(HandlerContext::logger(), "Unexpected entity type in cyclic subscription path: %s", req.path.c_str()); return "apps"; @@ -416,7 +418,7 @@ std::string CyclicSubscriptionHandlers::extract_entity_type(const httplib::Reque tl::expected CyclicSubscriptionHandlers::parse_resource_uri(const std::string & resource) { // Try entity-scoped format first: /api/v1/{entity_type}/{entity_id}/{collection}[/{resource_path}] - static const std::regex entity_regex(R"(^/api/v1/(apps|components)/([^/]+)/([^/]+)(/.*)?$)"); + static const std::regex entity_regex(R"(^/api/v1/(apps|components|functions)/([^/]+)/([^/]+)(/.*)?$)"); std::smatch match; if (std::regex_match(resource, match, entity_regex)) { ParsedResourceUri parsed; @@ -454,7 +456,7 @@ CyclicSubscriptionHandlers::parse_resource_uri(const std::string & resource) { } return tl::make_unexpected( - "Resource URI must match /api/v1/{apps|components}/{id}/{collection}[/{path}] " + "Resource URI must match /api/v1/{apps|components|functions}/{id}/{collection}[/{path}] " "or /api/v1/updates/{id}/status"); } diff --git a/src/ros2_medkit_gateway/src/http/handlers/discovery_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/discovery_handlers.cpp index 69950a3c4..d3141b496 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/discovery_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/discovery_handlers.cpp @@ -994,8 +994,12 @@ void DiscoveryHandlers::handle_get_function(const httplib::Request & req, httpli return; } - auto discovery = ctx_.node()->get_discovery_manager(); - auto func_opt = discovery->get_function(function_id); + const auto & cache = ctx_.node()->get_thread_safe_cache(); + auto func_opt = cache.get_function(function_id); + if (!func_opt) { + auto discovery = ctx_.node()->get_discovery_manager(); + func_opt = discovery->get_function(function_id); + } if (!func_opt) { HandlerContext::send_error(res, 404, ERR_ENTITY_NOT_FOUND, "Function not found", {{"function_id", function_id}}); @@ -1026,10 +1030,12 @@ void DiscoveryHandlers::handle_get_function(const httplib::Request & req, httpli response["faults"] = base_uri + "/faults"; response["logs"] = base_uri + "/logs"; response["bulk-data"] = base_uri + "/bulk-data"; + response["x-medkit-graph"] = base_uri + "/x-medkit-graph"; + response["cyclic-subscriptions"] = base_uri + "/cyclic-subscriptions"; using Cap = CapabilityBuilder::Capability; std::vector caps = {Cap::HOSTS, Cap::DATA, Cap::OPERATIONS, Cap::CONFIGURATIONS, - Cap::FAULTS, Cap::LOGS, Cap::BULK_DATA}; + Cap::FAULTS, Cap::LOGS, Cap::BULK_DATA, Cap::CYCLIC_SUBSCRIPTIONS}; response["capabilities"] = CapabilityBuilder::build_capabilities("functions", func.id, caps); append_plugin_capabilities(response["capabilities"], "functions", func.id, SovdEntityType::FUNCTION, ctx_.node()); diff --git a/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp index ba0fdc85f..b5f0cbae3 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp @@ -215,6 +215,12 @@ void HealthHandlers::handle_root(const httplib::Request & req, httplib::Response "PUT /api/v1/apps/{app_id}/cyclic-subscriptions/{subscription_id}", "DELETE /api/v1/apps/{app_id}/cyclic-subscriptions/{subscription_id}", "GET /api/v1/apps/{app_id}/cyclic-subscriptions/{subscription_id}/events", + "POST /api/v1/functions/{function_id}/cyclic-subscriptions", + "GET /api/v1/functions/{function_id}/cyclic-subscriptions", + "GET /api/v1/functions/{function_id}/cyclic-subscriptions/{subscription_id}", + "PUT /api/v1/functions/{function_id}/cyclic-subscriptions/{subscription_id}", + "DELETE /api/v1/functions/{function_id}/cyclic-subscriptions/{subscription_id}", + "GET /api/v1/functions/{function_id}/cyclic-subscriptions/{subscription_id}/events", // Global Faults "GET /api/v1/faults", "GET /api/v1/faults/stream", diff --git a/src/ros2_medkit_gateway/src/http/rest_server.cpp b/src/ros2_medkit_gateway/src/http/rest_server.cpp index b8b6c8313..a2977e9e0 100644 --- a/src/ros2_medkit_gateway/src/http/rest_server.cpp +++ b/src/ros2_medkit_gateway/src/http/rest_server.cpp @@ -1022,6 +1022,10 @@ void RESTServer::setup_routes() { [this](const httplib::Request & req, httplib::Response & res) { cyclic_sub_handlers_->handle_events(req, res); }); + srv->Get((api_path("/functions") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)/events$)"), + [this](const httplib::Request & req, httplib::Response & res) { + cyclic_sub_handlers_->handle_events(req, res); + }); // Create cyclic subscription srv->Post((api_path("/apps") + R"(/([^/]+)/cyclic-subscriptions$)"), @@ -1032,6 +1036,10 @@ void RESTServer::setup_routes() { [this](const httplib::Request & req, httplib::Response & res) { cyclic_sub_handlers_->handle_create(req, res); }); + srv->Post((api_path("/functions") + R"(/([^/]+)/cyclic-subscriptions$)"), + [this](const httplib::Request & req, httplib::Response & res) { + cyclic_sub_handlers_->handle_create(req, res); + }); // List cyclic subscriptions srv->Get((api_path("/apps") + R"(/([^/]+)/cyclic-subscriptions$)"), @@ -1042,6 +1050,10 @@ void RESTServer::setup_routes() { [this](const httplib::Request & req, httplib::Response & res) { cyclic_sub_handlers_->handle_list(req, res); }); + srv->Get((api_path("/functions") + R"(/([^/]+)/cyclic-subscriptions$)"), + [this](const httplib::Request & req, httplib::Response & res) { + cyclic_sub_handlers_->handle_list(req, res); + }); // Get single subscription srv->Get((api_path("/apps") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)$)"), @@ -1052,6 +1064,10 @@ void RESTServer::setup_routes() { [this](const httplib::Request & req, httplib::Response & res) { cyclic_sub_handlers_->handle_get(req, res); }); + srv->Get((api_path("/functions") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)$)"), + [this](const httplib::Request & req, httplib::Response & res) { + cyclic_sub_handlers_->handle_get(req, res); + }); // Update subscription srv->Put((api_path("/apps") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)$)"), @@ -1062,6 +1078,10 @@ void RESTServer::setup_routes() { [this](const httplib::Request & req, httplib::Response & res) { cyclic_sub_handlers_->handle_update(req, res); }); + srv->Put((api_path("/functions") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)$)"), + [this](const httplib::Request & req, httplib::Response & res) { + cyclic_sub_handlers_->handle_update(req, res); + }); // Delete subscription srv->Delete((api_path("/apps") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)$)"), @@ -1072,6 +1092,10 @@ void RESTServer::setup_routes() { [this](const httplib::Request & req, httplib::Response & res) { cyclic_sub_handlers_->handle_delete(req, res); }); + srv->Delete((api_path("/functions") + R"(/([^/]+)/cyclic-subscriptions/([^/]+)$)"), + [this](const httplib::Request & req, httplib::Response & res) { + cyclic_sub_handlers_->handle_delete(req, res); + }); // === Software Updates (server-level endpoints, REQ_INTEROP_082-085, 091-094) === if (update_handlers_) { diff --git a/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp b/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp index 9691048fc..ef02c363a 100644 --- a/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp +++ b/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp @@ -76,8 +76,13 @@ EntityCapabilities EntityCapabilities::for_type(SovdEntityType type) { // ros2_medkit extension: functions support additional collections via aggregation // (SOVD spec only defines data/operations for functions) caps.collections_ = { - ResourceCollection::DATA, ResourceCollection::OPERATIONS, ResourceCollection::CONFIGURATIONS, - ResourceCollection::FAULTS, ResourceCollection::LOGS, ResourceCollection::BULK_DATA, + ResourceCollection::DATA, + ResourceCollection::OPERATIONS, + ResourceCollection::CONFIGURATIONS, + ResourceCollection::FAULTS, + ResourceCollection::LOGS, + ResourceCollection::BULK_DATA, + ResourceCollection::CYCLIC_SUBSCRIPTIONS, }; caps.aggregated_collections_ = { ResourceCollection::DATA, ResourceCollection::OPERATIONS, ResourceCollection::CONFIGURATIONS, diff --git a/src/ros2_medkit_gateway/src/plugins/graph_provider_plugin.cpp b/src/ros2_medkit_gateway/src/plugins/graph_provider_plugin.cpp new file mode 100644 index 000000000..5a75ca812 --- /dev/null +++ b/src/ros2_medkit_gateway/src/plugins/graph_provider_plugin.cpp @@ -0,0 +1,711 @@ +// Copyright 2026 eclipse0922 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ros2_medkit_gateway/plugins/graph_provider_plugin.hpp" + +#include "ros2_medkit_gateway/fault_manager.hpp" +#include "ros2_medkit_gateway/gateway_node.hpp" +#include "ros2_medkit_gateway/http/error_codes.hpp" +#include "ros2_medkit_gateway/http/http_utils.hpp" +#include "ros2_medkit_gateway/plugins/plugin_context.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ros2_medkit_gateway { + +namespace { + +constexpr size_t kMaxCachedTopicMetrics = 512; + +bool is_filtered_topic_name(const std::string & topic_name) { + if (topic_name == "/parameter_events" || topic_name == "/rosout" || topic_name == "/diagnostics") { + return true; + } + if (topic_name.find("/nitros") != std::string::npos) { + return true; + } + if (topic_name.find("_supported_types") != std::string::npos) { + return true; + } + return false; +} + +struct EdgeBuildResult { + nlohmann::json json; + std::optional ratio; + bool is_error{false}; + bool is_degraded{false}; +}; + +const Function * find_function(const std::vector & functions, const std::string & function_id) { + auto it = std::find_if(functions.begin(), functions.end(), [&](const Function & function) { + return function.id == function_id; + }); + if (it == functions.end()) { + return nullptr; + } + return &(*it); +} + +std::vector resolve_scoped_apps(const std::string & function_id, const IntrospectionInput & input) { + std::vector scoped_apps; + const auto * function = find_function(input.functions, function_id); + if (!function) { + return scoped_apps; + } + + std::unordered_map apps_by_id; + apps_by_id.reserve(input.apps.size()); + for (const auto & app : input.apps) { + apps_by_id.emplace(app.id, &app); + } + + std::unordered_set seen_app_ids; + for (const auto & host : function->hosts) { + auto app_it = apps_by_id.find(host); + if (app_it != apps_by_id.end()) { + if (seen_app_ids.insert(app_it->second->id).second) { + scoped_apps.push_back(app_it->second); + } + continue; + } + + for (const auto & app : input.apps) { + if (app.component_id == host && seen_app_ids.insert(app.id).second) { + scoped_apps.push_back(&app); + } + } + } + + std::sort(scoped_apps.begin(), scoped_apps.end(), [](const App * lhs, const App * rhs) { + return lhs->id < rhs->id; + }); + return scoped_apps; +} + +std::vector filtered_topics(const std::vector & topics) { + std::set unique_topics; + for (const auto & topic : topics) { + if (!is_filtered_topic_name(topic)) { + unique_topics.insert(topic); + } + } + return {unique_topics.begin(), unique_topics.end()}; +} + +std::unordered_set filtered_topic_set(const std::vector & topics) { + auto sorted = filtered_topics(topics); + return {sorted.begin(), sorted.end()}; +} + +std::set collect_unique_topics(const std::vector & apps) { + std::set topics; + for (const auto * app : apps) { + for (const auto & topic : filtered_topics(app->topics.publishes)) { + topics.insert(topic); + } + for (const auto & topic : filtered_topics(app->topics.subscribes)) { + topics.insert(topic); + } + } + return topics; +} + +double resolve_expected_frequency(const GraphProviderPlugin::TopicMetrics * metrics, + const GraphProviderPlugin::GraphBuildConfig & config) { + if (metrics && metrics->expected_frequency_hz.has_value() && *metrics->expected_frequency_hz > 0.0) { + return *metrics->expected_frequency_hz; + } + return config.expected_frequency_hz_default; +} + +EdgeBuildResult build_edge_json(const std::string & edge_id, const App & source, const App & target, + const std::string & topic_name, const std::string & topic_id, + const GraphProviderPlugin::TopicMetrics * metrics, + const GraphProviderPlugin::GraphBuildState & state, + const GraphProviderPlugin::GraphBuildConfig & config) { + EdgeBuildResult result; + auto metrics_json = nlohmann::json{ + {"source", "greenwave_monitor"}, {"frequency_hz", nullptr}, {"latency_ms", nullptr}, + {"drop_rate_percent", 0.0}, {"metrics_status", "pending"}, + }; + + if (metrics) { + if (metrics->frequency_hz.has_value()) { + metrics_json["frequency_hz"] = *metrics->frequency_hz; + } + if (metrics->latency_ms.has_value()) { + metrics_json["latency_ms"] = *metrics->latency_ms; + } + metrics_json["drop_rate_percent"] = metrics->drop_rate_percent; + } + + result.json = {{"edge_id", edge_id}, {"source", source.id}, {"target", target.id}, + {"topic_id", topic_id}, {"transport_type", "unknown"}, {"metrics", metrics_json}}; + + const bool node_offline = !source.is_online || !target.is_online; + const bool topic_stale = state.stale_topics.count(topic_name) > 0; + const bool has_frequency = metrics && metrics->frequency_hz.has_value(); + + if (node_offline) { + result.json["metrics"]["metrics_status"] = "error"; + result.json["metrics"]["error_reason"] = "node_offline"; + result.is_error = true; + return result; + } + + if (topic_stale) { + result.json["metrics"]["metrics_status"] = "error"; + result.json["metrics"]["error_reason"] = "topic_stale"; + result.is_error = true; + return result; + } + + if (has_frequency) { + const double expected_frequency = resolve_expected_frequency(metrics, config); + const double frequency = *metrics->frequency_hz; + result.json["metrics"]["metrics_status"] = "active"; + + if (expected_frequency > 0.0) { + result.ratio = frequency / expected_frequency; + if (*result.ratio < config.degraded_frequency_ratio) { + result.is_degraded = true; + } + } + if (metrics->drop_rate_percent > config.drop_rate_percent_threshold) { + result.is_degraded = true; + } + return result; + } + + if (!state.diagnostics_seen) { + result.json["metrics"]["metrics_status"] = "pending"; + return result; + } + + result.json["metrics"]["metrics_status"] = "error"; + result.json["metrics"]["error_reason"] = "no_data_source"; + result.is_error = true; + return result; +} + +nlohmann::json build_graph_document_for_apps(const std::string & function_id, + const std::vector & scoped_apps, + const GraphProviderPlugin::GraphBuildState & state, + const GraphProviderPlugin::GraphBuildConfig & config, + const std::string & timestamp) { + nlohmann::json graph = {{"schema_version", "1.0.0"}, + {"graph_id", function_id + "-graph"}, + {"timestamp", timestamp}, + {"scope", {{"type", "function"}, {"entity_id", function_id}}}, + {"pipeline_status", "healthy"}, + {"bottleneck_edge", nullptr}, + {"topics", nlohmann::json::array()}, + {"nodes", nlohmann::json::array()}, + {"edges", nlohmann::json::array()}}; + + const auto topic_names = collect_unique_topics(scoped_apps); + + std::unordered_map topic_ids; + size_t topic_index = 0; + for (const auto & topic_name : topic_names) { + const auto topic_id = "topic-" + std::to_string(++topic_index); + topic_ids.emplace(topic_name, topic_id); + graph["topics"].push_back({{"topic_id", topic_id}, {"name", topic_name}}); + } + + for (const auto * app : scoped_apps) { + nlohmann::json node = {{"entity_id", app->id}, {"node_status", app->is_online ? "reachable" : "unreachable"}}; + if (!app->is_online) { + const auto last_seen_it = state.last_seen_by_app.find(app->id); + if (last_seen_it != state.last_seen_by_app.end()) { + node["last_seen"] = last_seen_it->second; + } + } + graph["nodes"].push_back(std::move(node)); + } + + bool has_errors = false; + bool has_degraded = false; + std::optional> bottleneck; + std::string pipeline_status = "healthy"; + size_t edge_index = 0; + + std::unordered_map> subscribes_by_app; + subscribes_by_app.reserve(scoped_apps.size()); + for (const auto * app : scoped_apps) { + subscribes_by_app.emplace(app->id, filtered_topic_set(app->topics.subscribes)); + } + + for (const auto * publisher : scoped_apps) { + for (const auto & topic_name : filtered_topics(publisher->topics.publishes)) { + auto topic_it = topic_ids.find(topic_name); + if (topic_it == topic_ids.end()) { + continue; + } + + for (const auto * subscriber : scoped_apps) { + const auto sub_it = subscribes_by_app.find(subscriber->id); + if (sub_it == subscribes_by_app.end() || sub_it->second.count(topic_name) == 0) { + continue; + } + + const auto edge_id = "edge-" + std::to_string(++edge_index); + const auto metrics_it = state.topic_metrics.find(topic_name); + const GraphProviderPlugin::TopicMetrics * metrics = + metrics_it == state.topic_metrics.end() ? nullptr : &metrics_it->second; + auto edge = + build_edge_json(edge_id, *publisher, *subscriber, topic_name, topic_it->second, metrics, state, config); + + has_errors = has_errors || edge.is_error; + has_degraded = has_degraded || edge.is_degraded; + if (edge.ratio.has_value()) { + if (!bottleneck.has_value() || edge.ratio.value() < bottleneck->second) { + bottleneck = std::make_pair(edge_id, edge.ratio.value()); + } + } + graph["edges"].push_back(std::move(edge.json)); + } + } + } + + if (has_errors) { + pipeline_status = "broken"; + } else if (has_degraded) { + pipeline_status = "degraded"; + } + graph["pipeline_status"] = pipeline_status; + + if (pipeline_status == "degraded" && bottleneck.has_value()) { + graph["bottleneck_edge"] = bottleneck->first; + } + + return nlohmann::json{{"x-medkit-graph", std::move(graph)}}; +} + +} // namespace + +std::string GraphProviderPlugin::name() const { + return "graph-provider"; +} + +void GraphProviderPlugin::configure(const nlohmann::json & /*config*/) { +} + +void GraphProviderPlugin::set_context(PluginContext & context) { + ctx_ = &context; + gateway_node_ = dynamic_cast(ctx_->node()); + ctx_->register_capability(SovdEntityType::FUNCTION, "x-medkit-graph"); + + load_parameters(); + subscribe_to_diagnostics(); + + if (!gateway_node_) { + log_warn("Skipping x-medkit-graph sampler registration: context node is not a GatewayNode"); + return; + } + + auto * sampler_registry = gateway_node_->get_sampler_registry(); + if (!sampler_registry) { + log_warn("Skipping x-medkit-graph sampler registration: sampler registry not available"); + return; + } + + sampler_registry->register_sampler( + "x-medkit-graph", + [this](const std::string & entity_id, + const std::string & /*resource_path*/) -> tl::expected { + auto payload = get_cached_or_built_graph(entity_id); + if (!payload.has_value()) { + return tl::make_unexpected(std::string("Graph snapshot not available: ") + entity_id); + } + return *payload; + }); + log_info("Registered x-medkit-graph cyclic subscription sampler"); +} + +void GraphProviderPlugin::register_routes(httplib::Server & server, const std::string & api_prefix) { + server.Get(api_prefix + R"(/functions/([^/]+)/x-medkit-graph)", + [this](const httplib::Request & req, httplib::Response & res) { + if (!ctx_) { + PluginContext::send_error(res, 503, ERR_SERVICE_UNAVAILABLE, "Graph provider context not initialized"); + return; + } + + const auto function_id = req.matches[1].str(); + auto entity = ctx_->validate_entity_for_route(req, res, function_id); + if (!entity) { + return; + } + + auto payload = get_cached_or_built_graph(function_id); + if (!payload.has_value()) { + PluginContext::send_error(res, 503, ERR_SERVICE_UNAVAILABLE, "Graph snapshot not available", + {{"function_id", function_id}}); + return; + } + + PluginContext::send_json(res, *payload); + }); +} + +IntrospectionResult GraphProviderPlugin::introspect(const IntrospectionInput & input) { + std::unordered_map new_cache; + const auto timestamp = current_timestamp(); + auto state = build_state_snapshot("", input, timestamp, false); + + for (const auto & function : input.functions) { + new_cache.emplace(function.id, + build_graph_document(function.id, input, state, resolve_config(function.id), timestamp)); + } + + { + std::lock_guard lock(cache_mutex_); + graph_cache_.swap(new_cache); + } + + return {}; +} + +nlohmann::json GraphProviderPlugin::build_graph_document(const std::string & function_id, + const IntrospectionInput & input, + const GraphBuildState & state, const GraphBuildConfig & config, + const std::string & timestamp) { + const auto scoped_apps = resolve_scoped_apps(function_id, input); + return build_graph_document_for_apps(function_id, scoped_apps, state, config, timestamp); +} + +void GraphProviderPlugin::subscribe_to_diagnostics() { + if (!ctx_ || !ctx_->node() || diagnostics_sub_) { + return; + } + + diagnostics_sub_ = ctx_->node()->create_subscription( + "/diagnostics", rclcpp::QoS(10), [this](const diagnostic_msgs::msg::DiagnosticArray::ConstSharedPtr & msg) { + diagnostics_callback(msg); + }); + + log_info("Subscribed to /diagnostics for x-medkit-graph metrics"); +} + +void GraphProviderPlugin::diagnostics_callback(const diagnostic_msgs::msg::DiagnosticArray::ConstSharedPtr & msg) { + std::unordered_map updates; + for (const auto & status : msg->status) { + if (is_filtered_topic_name(status.name)) { + continue; + } + auto metrics = parse_topic_metrics(status); + if (metrics.has_value()) { + updates[status.name] = *metrics; + } + } + + std::lock_guard lock(metrics_mutex_); + diagnostics_seen_ = true; + for (auto & [topic_name, metrics] : updates) { + if (topic_metrics_.find(topic_name) == topic_metrics_.end()) { + topic_metrics_order_.push_back(topic_name); + } + topic_metrics_[topic_name] = metrics; + } + while (topic_metrics_order_.size() > kMaxCachedTopicMetrics) { + const auto evicted_topic = topic_metrics_order_.front(); + topic_metrics_order_.pop_front(); + topic_metrics_.erase(evicted_topic); + } +} + +std::optional +GraphProviderPlugin::parse_topic_metrics(const diagnostic_msgs::msg::DiagnosticStatus & status) { + TopicMetrics metrics; + bool has_relevant_key = false; + + for (const auto & kv : status.values) { + if (kv.key == "frame_rate_msg") { + auto value = parse_double(kv.value); + if (value.has_value() && std::isfinite(*value)) { + metrics.frequency_hz = *value; + } + has_relevant_key = true; + } else if (kv.key == "current_delay_from_realtime_ms") { + auto value = parse_double(kv.value); + if (value.has_value() && std::isfinite(*value)) { + metrics.latency_ms = *value; + } + has_relevant_key = true; + } else if (kv.key == "expected_frequency") { + auto value = parse_double(kv.value); + if (value.has_value() && std::isfinite(*value) && *value > 0.0) { + metrics.expected_frequency_hz = *value; + } + has_relevant_key = true; + } else if (kv.key == "drop_rate_percent" || kv.key == "drop_rate") { + auto value = parse_double(kv.value); + if (value.has_value() && std::isfinite(*value) && *value >= 0.0) { + metrics.drop_rate_percent = *value; + } + has_relevant_key = true; + } else if (kv.key == "total_dropped_frames") { + has_relevant_key = true; + } + } + + if (!has_relevant_key) { + return std::nullopt; + } + return metrics; +} + +std::optional GraphProviderPlugin::parse_double(const std::string & value) { + try { + size_t parsed_chars = 0; + auto parsed = std::stod(value, &parsed_chars); + if (parsed_chars != value.size()) { + return std::nullopt; + } + return parsed; + } catch (...) { + return std::nullopt; + } +} + +std::string GraphProviderPlugin::generate_fault_code(const std::string & diagnostic_name) { + std::string result; + result.reserve(diagnostic_name.size()); + + bool last_was_separator = true; + for (char c : diagnostic_name) { + if (std::isalnum(static_cast(c))) { + result += static_cast(std::toupper(static_cast(c))); + last_was_separator = false; + } else if (!last_was_separator) { + result += '_'; + last_was_separator = true; + } + } + + if (!result.empty() && result.back() == '_') { + result.pop_back(); + } + return result; +} + +std::string GraphProviderPlugin::current_timestamp() { + const auto now_ns = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + return format_timestamp_ns(now_ns); +} + +GraphProviderPlugin::GraphBuildConfig GraphProviderPlugin::resolve_config(const std::string & function_id) const { + std::lock_guard lock(config_mutex_); + auto resolved = config_.defaults; + auto it = config_.by_function.find(function_id); + if (it != config_.by_function.end()) { + resolved = it->second; + } + return resolved; +} + +std::optional GraphProviderPlugin::get_cached_or_built_graph(const std::string & function_id) { + // In the real gateway, the merged entity cache is the source of truth. The + // plugin-side cache is populated during the merge pipeline before runtime + // linking finishes, so rebuilding here avoids serving stale node/topic state. + if (gateway_node_) { + auto rebuilt = build_graph_from_entity_cache(function_id); + if (rebuilt.has_value()) { + std::lock_guard lock(cache_mutex_); + graph_cache_[function_id] = *rebuilt; + return rebuilt; + } + } + + { + std::lock_guard lock(cache_mutex_); + auto it = graph_cache_.find(function_id); + if (it != graph_cache_.end()) { + return it->second; + } + } + + return std::nullopt; +} + +std::optional GraphProviderPlugin::build_graph_from_entity_cache(const std::string & function_id) { + if (!gateway_node_) { + return std::nullopt; + } + + IntrospectionInput input; + const auto & cache = gateway_node_->get_thread_safe_cache(); + input.areas = cache.get_areas(); + input.components = cache.get_components(); + input.apps = cache.get_apps(); + input.functions = cache.get_functions(); + + const auto timestamp = current_timestamp(); + // Reuse diagnostics-driven snapshots on the HTTP path instead of blocking a + // request thread on a fault-manager service round-trip for stale-topic data. + auto state = build_state_snapshot(function_id, input, timestamp, false); + const auto scoped_apps = resolve_scoped_apps(function_id, input); + return build_graph_document_for_apps(function_id, scoped_apps, state, resolve_config(function_id), timestamp); +} + +std::unordered_set GraphProviderPlugin::collect_stale_topics(const std::string & function_id, + const IntrospectionInput & input) const { + std::unordered_set stale_topics; + if (!gateway_node_) { + return stale_topics; + } + + auto * fault_manager = gateway_node_->get_fault_manager(); + if (!fault_manager || !fault_manager->is_available()) { + return stale_topics; + } + + std::unordered_map fault_code_to_topic; + for (const auto * app : resolve_scoped_apps(function_id, input)) { + for (const auto & topic_name : filtered_topics(app->topics.publishes)) { + fault_code_to_topic.emplace(generate_fault_code(topic_name), topic_name); + } + for (const auto & topic_name : filtered_topics(app->topics.subscribes)) { + fault_code_to_topic.emplace(generate_fault_code(topic_name), topic_name); + } + } + + auto result = fault_manager->list_faults(""); + if (!result.success || !result.data.contains("faults") || !result.data["faults"].is_array()) { + return stale_topics; + } + + for (const auto & fault : result.data["faults"]) { + if (!fault.contains("fault_code") || !fault.contains("severity") || !fault.contains("status")) { + continue; + } + const auto code_it = fault_code_to_topic.find(fault["fault_code"].get()); + if (code_it == fault_code_to_topic.end()) { + continue; + } + if (fault["status"].get() == ros2_medkit_msgs::msg::Fault::STATUS_CONFIRMED && + fault["severity"].get() == ros2_medkit_msgs::msg::Fault::SEVERITY_CRITICAL) { + stale_topics.insert(code_it->second); + } + } + + return stale_topics; +} + +GraphProviderPlugin::GraphBuildState GraphProviderPlugin::build_state_snapshot(const std::string & function_id, + const IntrospectionInput & input, + const std::string & timestamp, + bool include_stale_topics) { + GraphBuildState state; + { + std::lock_guard lock(metrics_mutex_); + state.topic_metrics = topic_metrics_; + state.diagnostics_seen = diagnostics_seen_; + } + if (include_stale_topics) { + state.stale_topics = collect_stale_topics(function_id, input); + } + + { + std::lock_guard lock(status_mutex_); + for (const auto & app : input.apps) { + if (app.is_online) { + last_seen_by_app_[app.id] = timestamp; + } + } + state.last_seen_by_app = last_seen_by_app_; + } + + return state; +} + +void GraphProviderPlugin::load_parameters() { + if (!ctx_ || !ctx_->node()) { + return; + } + + auto * node = ctx_->node(); + + const auto declare_if_needed = [node](const std::string & name, double default_value) { + if (!node->has_parameter(name)) { + node->declare_parameter(name, default_value); + } + }; + + declare_if_needed("graph_provider.expected_frequency_hz_default", 30.0); + declare_if_needed("graph_provider.degraded_frequency_ratio", 0.5); + declare_if_needed("graph_provider.drop_rate_percent_threshold", 5.0); + + ConfigOverrides new_config; + new_config.defaults.expected_frequency_hz_default = + node->get_parameter("graph_provider.expected_frequency_hz_default").as_double(); + new_config.defaults.degraded_frequency_ratio = + node->get_parameter("graph_provider.degraded_frequency_ratio").as_double(); + new_config.defaults.drop_rate_percent_threshold = + node->get_parameter("graph_provider.drop_rate_percent_threshold").as_double(); + + const auto overrides = node->get_node_parameters_interface()->get_parameter_overrides(); + const std::string prefix = "graph_provider.function_overrides."; + for (const auto & [name, value] : overrides) { + if (name.rfind(prefix, 0) != 0) { + continue; + } + + const auto remainder = name.substr(prefix.size()); + const auto split = remainder.rfind('.'); + if (split == std::string::npos) { + continue; + } + + const auto function_id = remainder.substr(0, split); + const auto field = remainder.substr(split + 1); + + auto [config_it, inserted] = new_config.by_function.emplace(function_id, new_config.defaults); + auto & function_config = config_it->second; + (void)inserted; + + if (value.get_type() != rclcpp::ParameterType::PARAMETER_DOUBLE && + value.get_type() != rclcpp::ParameterType::PARAMETER_INTEGER) { + continue; + } + + const double numeric_value = value.get_type() == rclcpp::ParameterType::PARAMETER_DOUBLE + ? value.get() + : static_cast(value.get()); + + if (field == "expected_frequency_hz") { + function_config.expected_frequency_hz_default = numeric_value; + } else if (field == "degraded_frequency_ratio") { + function_config.degraded_frequency_ratio = numeric_value; + } else if (field == "drop_rate_percent_threshold") { + function_config.drop_rate_percent_threshold = numeric_value; + } + } + + std::lock_guard lock(config_mutex_); + config_ = std::move(new_config); +} + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp b/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp index efb1c659b..1afed7f46 100644 --- a/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp +++ b/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp @@ -67,6 +67,15 @@ TEST(ParseResourceUriTest, VendorExtensionCollection) { EXPECT_EQ(result->resource_path, "/cpu_usage"); } +TEST(ParseResourceUriTest, FunctionVendorExtensionCollection) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/functions/func1/x-medkit-graph"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->entity_type, "functions"); + EXPECT_EQ(result->entity_id, "func1"); + EXPECT_EQ(result->collection, "x-medkit-graph"); + EXPECT_EQ(result->resource_path, ""); +} + TEST(ParseResourceUriTest, MultiSegmentResourcePath) { auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/parent/child/value"); ASSERT_TRUE(result.has_value()); @@ -83,9 +92,13 @@ TEST(ParseResourceUriTest, InvalidMalformedUri) { EXPECT_FALSE(result.has_value()); } -TEST(ParseResourceUriTest, InvalidFunctionEntityType) { +TEST(ParseResourceUriTest, FunctionEntityTypeSupported) { auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/functions/func1/data/topic"); - EXPECT_FALSE(result.has_value()); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->entity_type, "functions"); + EXPECT_EQ(result->entity_id, "func1"); + EXPECT_EQ(result->collection, "data"); + EXPECT_EQ(result->resource_path, "/topic"); } TEST(ParseResourceUriTest, PathTraversalRejected) { diff --git a/src/ros2_medkit_gateway/test/test_entity_resource_model.cpp b/src/ros2_medkit_gateway/test/test_entity_resource_model.cpp index a62fbae44..30e522403 100644 --- a/src/ros2_medkit_gateway/test/test_entity_resource_model.cpp +++ b/src/ros2_medkit_gateway/test/test_entity_resource_model.cpp @@ -184,6 +184,7 @@ TEST(EntityCapabilities, FunctionAggregatesCollections) { EXPECT_TRUE(caps.supports_collection(ResourceCollection::FAULTS)); EXPECT_TRUE(caps.supports_collection(ResourceCollection::LOGS)); EXPECT_TRUE(caps.supports_collection(ResourceCollection::BULK_DATA)); + EXPECT_TRUE(caps.supports_collection(ResourceCollection::CYCLIC_SUBSCRIPTIONS)); EXPECT_TRUE(caps.is_aggregated(ResourceCollection::DATA)); EXPECT_TRUE(caps.is_aggregated(ResourceCollection::OPERATIONS)); EXPECT_TRUE(caps.is_aggregated(ResourceCollection::CONFIGURATIONS)); diff --git a/src/ros2_medkit_gateway/test/test_gateway_node.cpp b/src/ros2_medkit_gateway/test/test_gateway_node.cpp index 9b89fb541..68c86a3a0 100644 --- a/src/ros2_medkit_gateway/test/test_gateway_node.cpp +++ b/src/ros2_medkit_gateway/test/test_gateway_node.cpp @@ -16,11 +16,16 @@ #include // NOLINT(build/include_order) #include +#include +#include #include #include #include +#include #include +#include +#include "ros2_medkit_gateway/discovery/models/function.hpp" #include "ros2_medkit_gateway/gateway_node.hpp" #include "ros2_medkit_gateway/http/http_utils.hpp" @@ -51,6 +56,9 @@ class TestGatewayNode : public ::testing::Test { } void TearDown() override { + for (const auto & path : temp_files_) { + std::remove(path.c_str()); + } node_.reset(); } @@ -75,9 +83,48 @@ class TestGatewayNode : public ::testing::Test { return httplib::Client(server_host_, server_port_); } + std::string write_temp_manifest(const std::string & contents) { + char path_template[] = "/tmp/ros2_medkit_gateway_manifest_XXXXXX"; + int fd = mkstemp(path_template); + EXPECT_GE(fd, 0); + if (fd >= 0) { + close(fd); + } + + std::ofstream out(path_template); + out << contents; + out.close(); + temp_files_.emplace_back(path_template); + return path_template; + } + + void load_function_fixture(const std::string & function_id) { + const std::string manifest = + "manifest_version: \"1.0\"\n" + "functions:\n" + " - id: \"" + + function_id + + "\"\n" + " name: \"Function Test\"\n"; + + ros2_medkit_gateway::DiscoveryConfig config; + config.mode = ros2_medkit_gateway::DiscoveryMode::MANIFEST_ONLY; + config.manifest_path = write_temp_manifest(manifest); + config.manifest_strict_validation = false; + + ASSERT_TRUE(node_->get_discovery_manager()->initialize(config)); + + ros2_medkit_gateway::Function func; + func.id = function_id; + func.name = "Function Test"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_functions({func}); + } + std::shared_ptr node_; std::string server_host_; int server_port_; + std::vector temp_files_; }; TEST_F(TestGatewayNode, test_health_endpoint) { @@ -675,6 +722,51 @@ TEST_F(TestGatewayNode, test_function_nonexistent) { EXPECT_EQ(res->status, 404); } +TEST_F(TestGatewayNode, test_function_detail_includes_cyclic_subscriptions_capability) { + load_function_fixture("graph_func"); + auto client = create_client(); + + auto res = client.Get((std::string(API_BASE_PATH) + "/functions/graph_func").c_str()); + + ASSERT_TRUE(res); + EXPECT_EQ(res->status, 200); + + auto body = nlohmann::json::parse(res->body); + ASSERT_TRUE(body.contains("cyclic-subscriptions")); + EXPECT_EQ(body["cyclic-subscriptions"], "/api/v1/functions/graph_func/cyclic-subscriptions"); + ASSERT_TRUE(body.contains("x-medkit-graph")); + EXPECT_EQ(body["x-medkit-graph"], "/api/v1/functions/graph_func/x-medkit-graph"); + + ASSERT_TRUE(body.contains("capabilities")); + const auto & caps = body["capabilities"]; + auto has_cyclic_subscriptions = std::any_of(caps.begin(), caps.end(), [](const auto & cap) { + return cap.contains("name") && cap["name"] == "cyclic-subscriptions"; + }); + EXPECT_TRUE(has_cyclic_subscriptions); +} + +TEST_F(TestGatewayNode, test_function_cyclic_subscription_create_for_graph_provider) { + load_function_fixture("graph_func"); + auto client = create_client(); + + nlohmann::json body = {{"resource", "/api/v1/functions/graph_func/x-medkit-graph"}, + {"interval", "normal"}, + {"duration", 30}, + {"protocol", "sse"}}; + + auto res = client.Post((std::string(API_BASE_PATH) + "/functions/graph_func/cyclic-subscriptions").c_str(), + body.dump(), "application/json"); + + ASSERT_TRUE(res); + EXPECT_EQ(res->status, 201); + + auto json_response = nlohmann::json::parse(res->body); + EXPECT_EQ(json_response["observed_resource"], "/api/v1/functions/graph_func/x-medkit-graph"); + EXPECT_EQ(json_response["protocol"], "sse"); + EXPECT_EQ(json_response["event_source"], + "/api/v1/functions/graph_func/cyclic-subscriptions/" + json_response["id"].get() + "/events"); +} + TEST_F(TestGatewayNode, test_function_hosts_nonexistent) { auto client = create_client(); diff --git a/src/ros2_medkit_gateway/test/test_graph_provider_plugin.cpp b/src/ros2_medkit_gateway/test/test_graph_provider_plugin.cpp new file mode 100644 index 000000000..aabad2d03 --- /dev/null +++ b/src/ros2_medkit_gateway/test/test_graph_provider_plugin.cpp @@ -0,0 +1,978 @@ +// Copyright 2026 eclipse0922 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ros2_medkit_gateway/discovery/models/app.hpp" +#include "ros2_medkit_gateway/discovery/models/function.hpp" +#include "ros2_medkit_gateway/gateway_node.hpp" +#include "ros2_medkit_gateway/plugins/graph_provider_plugin.hpp" +#include "ros2_medkit_gateway/plugins/plugin_context.hpp" +#include "ros2_medkit_msgs/srv/clear_fault.hpp" +#include "ros2_medkit_msgs/srv/get_fault.hpp" +#include "ros2_medkit_msgs/srv/list_faults.hpp" +#include "ros2_medkit_msgs/srv/report_fault.hpp" + +using namespace std::chrono_literals; +using namespace ros2_medkit_gateway; + +namespace { + +App make_app(const std::string & id, std::vector publishes = {}, std::vector subscribes = {}, + bool is_online = true, const std::string & component_id = "") { + App app; + app.id = id; + app.name = id; + app.component_id = component_id; + app.is_online = is_online; + app.topics.publishes = std::move(publishes); + app.topics.subscribes = std::move(subscribes); + return app; +} + +Function make_function(const std::string & id, std::vector hosts) { + Function func; + func.id = id; + func.name = id; + func.hosts = std::move(hosts); + return func; +} + +IntrospectionInput make_input(std::vector apps, std::vector functions) { + IntrospectionInput input; + input.apps = std::move(apps); + input.functions = std::move(functions); + return input; +} + +GraphProviderPlugin::GraphBuildConfig default_config() { + GraphProviderPlugin::GraphBuildConfig config; + config.expected_frequency_hz_default = 30.0; + config.degraded_frequency_ratio = 0.5; + config.drop_rate_percent_threshold = 5.0; + return config; +} + +GraphProviderPlugin::GraphBuildState make_state(bool diagnostics_seen = false) { + GraphProviderPlugin::GraphBuildState state; + state.diagnostics_seen = diagnostics_seen; + return state; +} + +GraphProviderPlugin::TopicMetrics make_metrics(double frequency_hz, std::optional latency_ms = std::nullopt, + double drop_rate_percent = 0.0, + std::optional expected_frequency_hz = std::nullopt) { + GraphProviderPlugin::TopicMetrics metrics; + metrics.frequency_hz = frequency_hz; + metrics.latency_ms = latency_ms; + metrics.drop_rate_percent = drop_rate_percent; + metrics.expected_frequency_hz = expected_frequency_hz; + return metrics; +} + +diagnostic_msgs::msg::KeyValue make_key_value(const std::string & key, const std::string & value) { + diagnostic_msgs::msg::KeyValue kv; + kv.key = key; + kv.value = value; + return kv; +} + +std::string find_topic_id(const nlohmann::json & graph, const std::string & topic_name) { + for (const auto & topic : graph["topics"]) { + if (topic["name"] == topic_name) { + return topic["topic_id"]; + } + } + return ""; +} + +const nlohmann::json * find_edge(const nlohmann::json & graph, const std::string & source, const std::string & target, + const std::string & topic_name) { + const auto topic_id = find_topic_id(graph, topic_name); + for (const auto & edge : graph["edges"]) { + if (edge["source"] == source && edge["target"] == target && edge["topic_id"] == topic_id) { + return &edge; + } + } + return nullptr; +} + +const nlohmann::json * find_node(const nlohmann::json & graph, const std::string & entity_id) { + for (const auto & node : graph["nodes"]) { + if (node["entity_id"] == entity_id) { + return &node; + } + } + return nullptr; +} + +class FakePluginContext : public PluginContext { + public: + explicit FakePluginContext(std::unordered_map entities, rclcpp::Node * node = nullptr) + : node_(node), entities_(std::move(entities)) { + } + + rclcpp::Node * node() const override { + return node_; + } + + std::optional get_entity(const std::string & id) const override { + auto it = entities_.find(id); + if (it == entities_.end()) { + return std::nullopt; + } + return it->second; + } + + nlohmann::json list_entity_faults(const std::string & /*entity_id*/) const override { + return nlohmann::json::array(); + } + + std::optional validate_entity_for_route(const httplib::Request & /*req*/, httplib::Response & res, + const std::string & entity_id) const override { + auto entity = get_entity(entity_id); + if (!entity) { + send_error(res, 404, "entity-not-found", "Entity not found"); + return std::nullopt; + } + return entity; + } + + void register_capability(SovdEntityType entity_type, const std::string & capability_name) override { + registered_capabilities_[entity_type].push_back(capability_name); + } + + void register_entity_capability(const std::string & entity_id, const std::string & capability_name) override { + entity_capabilities_[entity_id].push_back(capability_name); + } + + std::vector get_type_capabilities(SovdEntityType entity_type) const override { + auto it = registered_capabilities_.find(entity_type); + if (it == registered_capabilities_.end()) { + return {}; + } + return it->second; + } + + std::vector get_entity_capabilities(const std::string & entity_id) const override { + auto it = entity_capabilities_.find(entity_id); + if (it == entity_capabilities_.end()) { + return {}; + } + return it->second; + } + + private: + rclcpp::Node * node_{nullptr}; + std::unordered_map entities_; + std::map> registered_capabilities_; + std::unordered_map> entity_capabilities_; +}; + +class LocalHttpServer { + public: + LocalHttpServer() = default; + + ~LocalHttpServer() { + stop(); + } + + void start(httplib::Server & server) { + server_ = &server; + port_ = server.bind_to_any_port("127.0.0.1"); + ASSERT_GT(port_, 0); + thread_ = std::thread([&server]() { + server.listen_after_bind(); + }); + std::this_thread::sleep_for(50ms); + } + + void stop() { + if (thread_.joinable() && server_) { + server_->stop(); + thread_.join(); + } + } + + int port() const { + return port_; + } + + private: + httplib::Server * server_{nullptr}; + std::thread thread_; + int port_{-1}; +}; + +class GraphProviderPluginRosTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + + static void TearDownTestSuite() { + if (rclcpp::ok()) { + rclcpp::shutdown(); + } + } +}; + +} // namespace + +TEST(GraphProviderPluginBuildTest, BuildsThreeNodeChain) { + auto input = make_input( + {make_app("a", {"/a_to_b"}, {}), make_app("b", {"/b_to_c"}, {"/a_to_b"}), make_app("c", {}, {"/b_to_c"})}, + {make_function("fn", {"a", "b", "c"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + ASSERT_EQ(graph["nodes"].size(), 3u); + ASSERT_EQ(graph["topics"].size(), 2u); + ASSERT_EQ(graph["edges"].size(), 2u); + const auto * edge_ab = find_edge(graph, "a", "b", "/a_to_b"); + const auto * edge_bc = find_edge(graph, "b", "c", "/b_to_c"); + ASSERT_NE(edge_ab, nullptr); + ASSERT_NE(edge_bc, nullptr); + EXPECT_EQ((*edge_ab)["topic_id"], find_topic_id(graph, "/a_to_b")); + EXPECT_EQ((*edge_bc)["topic_id"], find_topic_id(graph, "/b_to_c")); +} + +TEST(GraphProviderPluginBuildTest, BuildsFanOutEdgesSharingTopicId) { + auto input = + make_input({make_app("a", {"/shared"}, {}), make_app("b", {}, {"/shared"}), make_app("c", {}, {"/shared"})}, + {make_function("fn", {"a", "b", "c"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + ASSERT_EQ(graph["edges"].size(), 2u); + const auto * edge_ab = find_edge(graph, "a", "b", "/shared"); + const auto * edge_ac = find_edge(graph, "a", "c", "/shared"); + ASSERT_NE(edge_ab, nullptr); + ASSERT_NE(edge_ac, nullptr); + EXPECT_EQ((*edge_ab)["topic_id"], (*edge_ac)["topic_id"]); +} + +TEST(GraphProviderPluginBuildTest, BuildsFanInEdgesSharingTopicId) { + auto input = + make_input({make_app("a", {"/shared"}, {}), make_app("b", {"/shared"}, {}), make_app("c", {}, {"/shared"})}, + {make_function("fn", {"a", "b", "c"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + ASSERT_EQ(graph["edges"].size(), 2u); + const auto * edge_ac = find_edge(graph, "a", "c", "/shared"); + const auto * edge_bc = find_edge(graph, "b", "c", "/shared"); + ASSERT_NE(edge_ac, nullptr); + ASSERT_NE(edge_bc, nullptr); + EXPECT_EQ((*edge_ac)["topic_id"], (*edge_bc)["topic_id"]); +} + +TEST(GraphProviderPluginBuildTest, BuildsSelfLoopEdgeForAppPublishingAndSubscribingSameTopic) { + auto input = make_input({make_app("a", {"/loop"}, {"/loop"})}, {make_function("fn", {"a"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + ASSERT_EQ(graph["topics"].size(), 1u); + ASSERT_EQ(graph["edges"].size(), 1u); + const auto * edge = find_edge(graph, "a", "a", "/loop"); + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["topic_id"], find_topic_id(graph, "/loop")); +} + +TEST(GraphProviderPluginBuildTest, FiltersInfrastructureAndNitrosTopics) { + auto input = make_input( + {make_app("a", {"/rosout", "/parameter_events", "/diagnostics", "/camera/nitros", "/real"}, {}), + make_app("b", {}, {"/rosout", "/parameter_events", "/diagnostics", "/camera/_supported_types", "/real"})}, + {make_function("fn", {"a", "b"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + ASSERT_EQ(graph["topics"].size(), 1u); + EXPECT_EQ(graph["topics"][0]["name"], "/real"); + ASSERT_EQ(graph["edges"].size(), 1u); + EXPECT_NE(find_edge(graph, "a", "b", "/real"), nullptr); +} + +TEST(GraphProviderPluginBuildTest, MarksReachableAndUnreachableNodes) { + auto input = make_input({make_app("online", {"/topic"}, {}, true), make_app("offline", {}, {"/topic"}, false)}, + {make_function("fn", {"online", "offline"})}); + auto state = make_state(); + state.last_seen_by_app["offline"] = "2026-03-08T11:59:00.000Z"; + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * online_node = find_node(graph, "online"); + const auto * offline_node = find_node(graph, "offline"); + + ASSERT_EQ(graph["nodes"].size(), 2u); + ASSERT_NE(online_node, nullptr); + ASSERT_NE(offline_node, nullptr); + EXPECT_EQ((*online_node)["node_status"], "reachable"); + EXPECT_FALSE(online_node->contains("last_seen")); + EXPECT_EQ((*offline_node)["node_status"], "unreachable"); + EXPECT_TRUE(offline_node->contains("last_seen")); + EXPECT_EQ((*offline_node)["last_seen"], "2026-03-08T11:59:00.000Z"); +} + +TEST(GraphProviderPluginBuildTest, ReturnsEmptyGraphForNoApps) { + auto input = make_input({}, {make_function("fn", {})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + EXPECT_EQ(graph["schema_version"], "1.0.0"); + EXPECT_EQ(graph["graph_id"], "fn-graph"); + EXPECT_EQ(graph["scope"]["type"], "function"); + EXPECT_EQ(graph["scope"]["entity_id"], "fn"); + EXPECT_TRUE(graph["topics"].empty()); + EXPECT_TRUE(graph["nodes"].empty()); + EXPECT_TRUE(graph["edges"].empty()); +} + +TEST(GraphProviderPluginBuildTest, ExpandsComponentHostsToScopedApps) { + auto input = make_input({make_app("a", {"/topic"}, {}, true, "comp1"), make_app("b", {}, {"/topic"}, true, "comp1"), + make_app("c", {}, {"/topic"}, true, "comp2")}, + {make_function("fn", {"comp1"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + ASSERT_EQ(graph["nodes"].size(), 2u); + EXPECT_NE(find_node(graph, "a"), nullptr); + EXPECT_NE(find_node(graph, "b"), nullptr); + EXPECT_EQ(find_node(graph, "c"), nullptr); + EXPECT_NE(find_edge(graph, "a", "b", "/topic"), nullptr); +} + +TEST(GraphProviderPluginMetricsTest, MarksActiveEdgeWhenGreenwaveMetricsExist) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + auto state = make_state(true); + state.topic_metrics["/topic"] = make_metrics(29.8, 1.2, 0.0, 30.0); + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["source"], "greenwave_monitor"); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "active"); + EXPECT_DOUBLE_EQ((*edge)["metrics"]["frequency_hz"], 29.8); + EXPECT_DOUBLE_EQ((*edge)["metrics"]["latency_ms"], 1.2); + EXPECT_DOUBLE_EQ((*edge)["metrics"]["drop_rate_percent"], 0.0); + EXPECT_FALSE((*edge)["metrics"].contains("error_reason")); + EXPECT_FALSE(edge->contains("error_reason")); + EXPECT_EQ(graph["pipeline_status"], "healthy"); +} + +TEST(GraphProviderPluginMetricsTest, MarksPendingWhenDiagnosticsHaveNotArrivedYet) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(false), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "pending"); + EXPECT_FALSE((*edge)["metrics"].contains("error_reason")); + EXPECT_FALSE(edge->contains("error_reason")); +} + +TEST(GraphProviderPluginMetricsTest, MarksTopicStaleErrorsFromFaultState) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + auto state = make_state(true); + state.stale_topics.insert("/topic"); + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "error"); + ASSERT_TRUE((*edge)["metrics"].contains("error_reason")); + EXPECT_EQ((*edge)["metrics"]["error_reason"], "topic_stale"); + EXPECT_FALSE(edge->contains("error_reason")); + EXPECT_EQ(graph["pipeline_status"], "broken"); +} + +TEST(GraphProviderPluginMetricsTest, MarksNodeOfflineErrors) { + auto input = make_input({make_app("a", {"/topic"}, {}, true), make_app("b", {}, {"/topic"}, false)}, + {make_function("fn", {"a", "b"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(true), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "error"); + ASSERT_TRUE((*edge)["metrics"].contains("error_reason")); + EXPECT_EQ((*edge)["metrics"]["error_reason"], "node_offline"); + EXPECT_FALSE(edge->contains("error_reason")); + EXPECT_EQ(graph["pipeline_status"], "broken"); +} + +TEST(GraphProviderPluginMetricsTest, MarksNoDataSourceErrorsWhenDiagnosticsArePresentButTopicIsMissing) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(true), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "error"); + ASSERT_TRUE((*edge)["metrics"].contains("error_reason")); + EXPECT_EQ((*edge)["metrics"]["error_reason"], "no_data_source"); + EXPECT_FALSE(edge->contains("error_reason")); + EXPECT_EQ(graph["pipeline_status"], "broken"); +} + +TEST(GraphProviderPluginMetricsTest, KeepsBrokenPipelineBottleneckNullWhenNoFrequencyExists) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + + auto doc = GraphProviderPlugin::build_graph_document("fn", input, make_state(true), default_config(), + "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + EXPECT_EQ(graph["pipeline_status"], "broken"); + EXPECT_TRUE(graph["bottleneck_edge"].is_null()); +} + +TEST(GraphProviderPluginMetricsTest, MarksPipelineDegradedWhenFrequencyDropsBelowThreshold) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + auto state = make_state(true); + state.topic_metrics["/topic"] = make_metrics(10.0, 5.0, 0.0, 30.0); + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "active"); + EXPECT_EQ(graph["pipeline_status"], "degraded"); + EXPECT_EQ(graph["bottleneck_edge"], (*edge)["edge_id"]); +} + +TEST(GraphProviderPluginMetricsTest, MarksPipelineDegradedWhenDropRateExceedsThreshold) { + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + auto state = make_state(true); + state.topic_metrics["/topic"] = make_metrics(30.0, 1.0, 7.5, 30.0); + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + + ASSERT_NE(edge, nullptr); + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "active"); + EXPECT_DOUBLE_EQ((*edge)["metrics"]["drop_rate_percent"], 7.5); + EXPECT_EQ(graph["pipeline_status"], "degraded"); + EXPECT_EQ(graph["bottleneck_edge"], (*edge)["edge_id"]); +} + +TEST(GraphProviderPluginMetricsTest, ChoosesSlowestEdgeAsBottleneck) { + auto input = make_input({make_app("a", {"/ab", "/ac"}, {}), make_app("b", {}, {"/ab"}), make_app("c", {}, {"/ac"})}, + {make_function("fn", {"a", "b", "c"})}); + auto state = make_state(true); + state.topic_metrics["/ab"] = make_metrics(25.0, 1.0, 0.0, 30.0); + state.topic_metrics["/ac"] = make_metrics(5.0, 1.0, 0.0, 30.0); + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + const auto * slower_edge = find_edge(graph, "a", "c", "/ac"); + + ASSERT_NE(slower_edge, nullptr); + EXPECT_EQ(graph["bottleneck_edge"], (*slower_edge)["edge_id"]); +} + +TEST(GraphProviderPluginRouteTest, ServesFunctionGraphFromCachedSnapshot) { + GraphProviderPlugin plugin; + FakePluginContext ctx({{"fn", PluginEntityInfo{SovdEntityType::FUNCTION, "fn", "", ""}}}); + plugin.set_context(ctx); + + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + plugin.introspect(input); + + httplib::Server server; + plugin.register_routes(server, "/api/v1"); + + LocalHttpServer local_server; + local_server.start(server); + + httplib::Client client("127.0.0.1", local_server.port()); + auto res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + for (int attempt = 0; !res && attempt < 19; ++attempt) { + std::this_thread::sleep_for(50ms); + res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + } + + ASSERT_TRUE(res); + ASSERT_EQ(res->status, 200); + + auto body = nlohmann::json::parse(res->body); + ASSERT_TRUE(body.contains("x-medkit-graph")); + EXPECT_EQ(body["x-medkit-graph"]["schema_version"], "1.0.0"); + EXPECT_EQ(body["x-medkit-graph"]["scope"]["type"], "function"); + EXPECT_EQ(body["x-medkit-graph"]["scope"]["entity_id"], "fn"); + EXPECT_TRUE(body["x-medkit-graph"].contains("timestamp")); + + local_server.stop(); +} + +TEST(GraphProviderPluginRouteTest, RegistersFunctionCapabilityOnContext) { + GraphProviderPlugin plugin; + FakePluginContext ctx({{"fn", PluginEntityInfo{SovdEntityType::FUNCTION, "fn", "", ""}}}); + + plugin.set_context(ctx); + + const auto caps = ctx.get_type_capabilities(SovdEntityType::FUNCTION); + ASSERT_EQ(caps.size(), 1u); + EXPECT_EQ(caps[0], "x-medkit-graph"); +} + +TEST(GraphProviderPluginRouteTest, UsesPreviousOnlineTimestampForOfflineLastSeen) { + GraphProviderPlugin plugin; + FakePluginContext ctx({{"fn", PluginEntityInfo{SovdEntityType::FUNCTION, "fn", "", ""}}}); + plugin.set_context(ctx); + + auto online_input = make_input({make_app("node1", {}, {}, true)}, {make_function("fn", {"node1"})}); + plugin.introspect(online_input); + + std::this_thread::sleep_for(5ms); + + auto offline_input = make_input({make_app("node1", {}, {}, false)}, {make_function("fn", {"node1"})}); + plugin.introspect(offline_input); + + httplib::Server server; + plugin.register_routes(server, "/api/v1"); + + LocalHttpServer local_server; + local_server.start(server); + + httplib::Client client("127.0.0.1", local_server.port()); + auto res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + for (int attempt = 0; !res && attempt < 19; ++attempt) { + std::this_thread::sleep_for(50ms); + res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + } + + ASSERT_TRUE(res); + ASSERT_EQ(res->status, 200); + + auto body = nlohmann::json::parse(res->body); + const auto & graph = body["x-medkit-graph"]; + const auto * node = find_node(graph, "node1"); + ASSERT_NE(node, nullptr); + ASSERT_TRUE(node->contains("last_seen")); + EXPECT_LT((*node)["last_seen"].get(), graph["timestamp"].get()); + + local_server.stop(); +} + +TEST_F(GraphProviderPluginRosTest, PrefersMergedGatewayEntityCacheOverStalePluginSnapshot) { + auto gateway_node = std::make_shared(); + const auto server_host = gateway_node->get_parameter("server.host").as_string(); + const auto server_port = static_cast(gateway_node->get_parameter("server.port").as_int()); + + { + const auto start = std::chrono::steady_clock::now(); + const auto timeout = std::chrono::seconds(5); + httplib::Client health_client(server_host, server_port); + while (std::chrono::steady_clock::now() - start < timeout) { + if (auto health = health_client.Get("/api/v1/health")) { + if (health->status == 200) { + break; + } + } + std::this_thread::sleep_for(50ms); + } + + GraphProviderPlugin * plugin = nullptr; + for (auto * provider : gateway_node->get_plugin_manager()->get_introspection_providers()) { + plugin = dynamic_cast(provider); + if (plugin) { + break; + } + } + ASSERT_NE(plugin, nullptr); + + auto stale_input = + make_input({make_app("a", {}, {}, false), make_app("b", {}, {}, false)}, {make_function("fn", {"a", "b"})}); + plugin->introspect(stale_input); + + auto fresh_input = make_input({make_app("a", {"/topic"}, {}, true), make_app("b", {}, {"/topic"}, true)}, + {make_function("fn", {"a", "b"})}); + + auto & cache = const_cast(gateway_node->get_thread_safe_cache()); + cache.update_apps(fresh_input.apps); + cache.update_functions(fresh_input.functions); + + httplib::Client client(server_host, server_port); + auto res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + for (int attempt = 0; !res && attempt < 19; ++attempt) { + std::this_thread::sleep_for(50ms); + res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + } + + ASSERT_TRUE(res); + ASSERT_EQ(res->status, 200); + + auto body = nlohmann::json::parse(res->body); + const auto & graph = body["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + const auto * node_a = find_node(graph, "a"); + const auto * node_b = find_node(graph, "b"); + + ASSERT_NE(edge, nullptr); + ASSERT_NE(node_a, nullptr); + ASSERT_NE(node_b, nullptr); + EXPECT_EQ((*node_a)["node_status"], "reachable"); + EXPECT_EQ((*node_b)["node_status"], "reachable"); + EXPECT_EQ(graph["edges"].size(), 1u); + } + + gateway_node.reset(); +} + +TEST_F(GraphProviderPluginRosTest, IntrospectDoesNotQueryFaultManagerServiceOnGatewayThread) { + auto service_node = std::make_shared("graph_faults_service_node"); + std::atomic list_fault_calls{0}; + auto report_fault_service = service_node->create_service( + "/fault_manager/report_fault", [](const std::shared_ptr /*request*/, + std::shared_ptr response) { + response->accepted = true; + }); + auto get_fault_service = service_node->create_service( + "/fault_manager/get_fault", [](const std::shared_ptr /*request*/, + std::shared_ptr response) { + response->success = false; + response->error_message = "not found"; + }); + auto list_faults_service = service_node->create_service( + "/fault_manager/list_faults", + [&list_fault_calls](const std::shared_ptr /*request*/, + std::shared_ptr response) { + ++list_fault_calls; + response->faults.clear(); + }); + auto clear_fault_service = service_node->create_service( + "/fault_manager/clear_fault", [](const std::shared_ptr /*request*/, + std::shared_ptr response) { + response->success = true; + response->message = "cleared"; + }); + + auto gateway_node = std::make_shared(); + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(service_node); + executor.add_node(gateway_node); + std::thread spin_thread([&executor]() { + executor.spin(); + }); + + auto * plugin = [&]() -> GraphProviderPlugin * { + for (auto * provider : gateway_node->get_plugin_manager()->get_introspection_providers()) { + auto * graph_provider = dynamic_cast(provider); + if (graph_provider) { + return graph_provider; + } + } + return nullptr; + }(); + ASSERT_NE(plugin, nullptr); + + const auto deadline = std::chrono::steady_clock::now() + 3s; + while (std::chrono::steady_clock::now() < deadline && !gateway_node->get_fault_manager()->is_available()) { + std::this_thread::sleep_for(50ms); + } + ASSERT_TRUE(gateway_node->get_fault_manager()->is_available()); + + list_fault_calls.store(0); + + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + plugin->introspect(input); + + EXPECT_EQ(list_fault_calls.load(), 0); + + executor.cancel(); + spin_thread.join(); + executor.remove_node(gateway_node); + executor.remove_node(service_node); + gateway_node.reset(); + clear_fault_service.reset(); + list_faults_service.reset(); + get_fault_service.reset(); + report_fault_service.reset(); + service_node.reset(); +} + +TEST_F(GraphProviderPluginRosTest, HttpGraphRequestDoesNotQueryFaultManagerService) { + auto service_node = std::make_shared("graph_route_faults_service_node"); + std::atomic list_fault_calls{0}; + auto report_fault_service = service_node->create_service( + "/fault_manager/report_fault", [](const std::shared_ptr /*request*/, + std::shared_ptr response) { + response->accepted = true; + }); + auto get_fault_service = service_node->create_service( + "/fault_manager/get_fault", [](const std::shared_ptr /*request*/, + std::shared_ptr response) { + response->success = false; + response->error_message = "not found"; + }); + auto list_faults_service = service_node->create_service( + "/fault_manager/list_faults", + [&list_fault_calls](const std::shared_ptr /*request*/, + std::shared_ptr response) { + ++list_fault_calls; + response->faults.clear(); + }); + auto clear_fault_service = service_node->create_service( + "/fault_manager/clear_fault", [](const std::shared_ptr /*request*/, + std::shared_ptr response) { + response->success = true; + response->message = "cleared"; + }); + + auto gateway_node = std::make_shared(); + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(service_node); + executor.add_node(gateway_node); + std::thread spin_thread([&executor]() { + executor.spin(); + }); + + const auto deadline = std::chrono::steady_clock::now() + 3s; + while (std::chrono::steady_clock::now() < deadline && !gateway_node->get_fault_manager()->is_available()) { + std::this_thread::sleep_for(50ms); + } + ASSERT_TRUE(gateway_node->get_fault_manager()->is_available()); + + auto & cache = const_cast(gateway_node->get_thread_safe_cache()); + cache.update_apps({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}); + cache.update_functions({make_function("fn", {"a", "b"})}); + + const auto server_host = gateway_node->get_parameter("server.host").as_string(); + const auto server_port = static_cast(gateway_node->get_parameter("server.port").as_int()); + + httplib::Client client(server_host, server_port); + auto res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + for (int attempt = 0; !res && attempt < 19; ++attempt) { + std::this_thread::sleep_for(50ms); + res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + } + + ASSERT_TRUE(res); + ASSERT_EQ(res->status, 200); + EXPECT_EQ(list_fault_calls.load(), 0); + + executor.cancel(); + spin_thread.join(); + executor.remove_node(gateway_node); + executor.remove_node(service_node); + gateway_node.reset(); + clear_fault_service.reset(); + list_faults_service.reset(); + get_fault_service.reset(); + report_fault_service.reset(); + service_node.reset(); +} + +TEST(GraphProviderPluginMetricsTest, BrokenPipelineHasNullBottleneckEdgeEvenWhenActiveEdgesExist) { + // edge a→b: /ab, b is offline → node_offline error → broken + // edge a→c: /ac, both online, freq=5 (below 30*0.5) → active but degraded + // pipeline_status should be "broken", bottleneck_edge should be null + auto input = make_input( + {make_app("a", {"/ab", "/ac"}, {}, true), make_app("b", {}, {"/ab"}, false), make_app("c", {}, {"/ac"}, true)}, + {make_function("fn", {"a", "b", "c"})}); + auto state = make_state(true); + state.topic_metrics["/ac"] = make_metrics(5.0, 1.0, 0.0, 30.0); + + auto doc = + GraphProviderPlugin::build_graph_document("fn", input, state, default_config(), "2026-03-08T12:00:00.000Z"); + const auto & graph = doc["x-medkit-graph"]; + + EXPECT_EQ(graph["pipeline_status"], "broken"); + EXPECT_TRUE(graph["bottleneck_edge"].is_null()); +} + +TEST_F(GraphProviderPluginRosTest, SetsDiagnosticsSeenWhenNonGreenwaveMessageArrives) { + auto node = std::make_shared("graph_diag_seen_test_node"); + auto pub_node = std::make_shared("graph_diag_seen_pub"); + auto diagnostics_pub = pub_node->create_publisher("/diagnostics", 10); + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + executor.add_node(pub_node); + + { + GraphProviderPlugin plugin; + FakePluginContext ctx({{"fn", PluginEntityInfo{SovdEntityType::FUNCTION, "fn", "", ""}}}, node.get()); + plugin.set_context(ctx); + + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + + // Publish a diagnostic message with no greenwave keys (non-greenwave hardware diagnostics) + diagnostic_msgs::msg::DiagnosticArray msg; + diagnostic_msgs::msg::DiagnosticStatus status; + status.name = "/some/hardware/driver"; + status.values = {make_key_value("temperature", "72.3"), make_key_value("voltage", "3.3")}; + msg.status.push_back(status); + diagnostics_pub->publish(msg); + executor.spin_some(); + std::this_thread::sleep_for(20ms); + executor.spin_some(); + + plugin.introspect(input); + + httplib::Server server; + plugin.register_routes(server, "/api/v1"); + LocalHttpServer local_server; + local_server.start(server); + + httplib::Client client("127.0.0.1", local_server.port()); + auto res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + for (int attempt = 0; !res && attempt < 19; ++attempt) { + std::this_thread::sleep_for(50ms); + res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + } + + ASSERT_TRUE(res); + ASSERT_EQ(res->status, 200); + + auto body = nlohmann::json::parse(res->body); + const auto & graph = body["x-medkit-graph"]; + const auto * edge = find_edge(graph, "a", "b", "/topic"); + ASSERT_NE(edge, nullptr); + // /diagnostics is alive (non-greenwave message received), but /topic has no greenwave entry + // → must be "error"/"no_data_source", NOT "pending" + EXPECT_EQ((*edge)["metrics"]["metrics_status"], "error"); + EXPECT_EQ((*edge)["metrics"]["error_reason"], "no_data_source"); + + local_server.stop(); + } + + executor.remove_node(pub_node); + executor.remove_node(node); + pub_node.reset(); + node.reset(); +} + +TEST_F(GraphProviderPluginRosTest, AppliesPerFunctionConfigOverridesFromNodeParameters) { + rclcpp::NodeOptions options; + options.append_parameter_override("graph_provider.function_overrides.fn.drop_rate_percent_threshold", 1.0); + auto node = std::make_shared("graph_provider_test_node", options); + auto publisher_node = std::make_shared("graph_provider_test_pub"); + auto diagnostics_pub = publisher_node->create_publisher("/diagnostics", 10); + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + executor.add_node(publisher_node); + + { + GraphProviderPlugin plugin; + FakePluginContext ctx({{"fn", PluginEntityInfo{SovdEntityType::FUNCTION, "fn", "", ""}}}, node.get()); + plugin.set_context(ctx); + + auto input = + make_input({make_app("a", {"/topic"}, {}), make_app("b", {}, {"/topic"})}, {make_function("fn", {"a", "b"})}); + + diagnostic_msgs::msg::DiagnosticArray msg; + diagnostic_msgs::msg::DiagnosticStatus status; + status.name = "/topic"; + status.values = {make_key_value("frame_rate_msg", "30.0"), make_key_value("current_delay_from_realtime_ms", "1.0"), + make_key_value("expected_frequency", "30.0"), make_key_value("drop_rate_percent", "2.0")}; + msg.status.push_back(status); + diagnostics_pub->publish(msg); + executor.spin_some(); + std::this_thread::sleep_for(20ms); + executor.spin_some(); + + plugin.introspect(input); + + httplib::Server server; + plugin.register_routes(server, "/api/v1"); + + LocalHttpServer local_server; + local_server.start(server); + + httplib::Client client("127.0.0.1", local_server.port()); + auto res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + for (int attempt = 0; !res && attempt < 19; ++attempt) { + std::this_thread::sleep_for(50ms); + res = client.Get("/api/v1/functions/fn/x-medkit-graph"); + } + + ASSERT_TRUE(res); + ASSERT_EQ(res->status, 200); + + auto body = nlohmann::json::parse(res->body); + const auto & graph = body["x-medkit-graph"]; + EXPECT_EQ(graph["pipeline_status"], "degraded"); + + const auto * edge = find_edge(graph, "a", "b", "/topic"); + ASSERT_NE(edge, nullptr); + EXPECT_DOUBLE_EQ((*edge)["metrics"]["drop_rate_percent"], 2.0); + + local_server.stop(); + } + + executor.remove_node(publisher_node); + executor.remove_node(node); + publisher_node.reset(); + node.reset(); +} diff --git a/src/ros2_medkit_integration_tests/test/features/test_discovery_legacy_mode.test.py b/src/ros2_medkit_integration_tests/test/features/test_discovery_legacy_mode.test.py index 12b640d46..69e41c7d5 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_discovery_legacy_mode.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_discovery_legacy_mode.test.py @@ -60,10 +60,12 @@ class TestLegacyDiscoveryMode(GatewayTestCase): def test_each_node_has_own_component(self): """Each node should become its own Component (no synthetic grouping).""" + required_nodes = ('temp_sensor', 'rpm_sensor', 'pressure_sensor') data = self.poll_endpoint_until( '/components', - lambda d: d if any( - 'temp_sensor' in c['id'] for c in d.get('items', []) + lambda d: d if all( + any(node_name in c['id'] for c in d.get('items', [])) + for node_name in required_nodes ) else None, timeout=60.0, ) @@ -79,6 +81,10 @@ def test_each_node_has_own_component(self): any('rpm_sensor' in cid for cid in component_ids), f"rpm_sensor not found in components: {component_ids}", ) + self.assertTrue( + any('pressure_sensor' in cid for cid in component_ids), + f"pressure_sensor not found in components: {component_ids}", + ) def test_no_synthetic_namespace_components(self): """No synthetic components from namespace grouping should exist."""