Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 70 additions & 6 deletions docs/api/rest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,75 @@ Functions
``GET /api/v1/functions/{function_id}/hosts``
List apps that host this function.

``GET /api/v1/functions/{function_id}/x-medkit-graph``
Get a function-scoped topology snapshot with per-topic metrics and pipeline status.

**Example Response:**

.. code-block:: json

{
"x-medkit-graph": {
"schema_version": "1.0.0",
"graph_id": "perception_graph-graph",
"timestamp": "2026-03-08T12:00:00.000Z",
"scope": {
"type": "function",
"entity_id": "perception_graph"
},
"pipeline_status": "degraded",
"bottleneck_edge": "edge-2",
"topics": [
{
"topic_id": "topic-1",
"name": "/camera/front/image_raw"
}
],
"nodes": [
{
"entity_id": "camera_front",
"node_status": "reachable"
},
{
"entity_id": "detector",
"node_status": "unreachable",
"last_seen": "2026-03-08T11:59:42.100Z"
}
],
"edges": [
{
"edge_id": "edge-2",
"source": "camera_front",
"target": "detector",
"topic_id": "topic-1",
"transport_type": "unknown",
"metrics": {
"source": "greenwave_monitor",
"frequency_hz": 12.5,
"latency_ms": 4.2,
"drop_rate_percent": 0.0,
"metrics_status": "active"
}
}
]
}
}

**Field Notes:**

- ``pipeline_status``: overall graph state, one of ``healthy``, ``degraded``, ``broken``
- ``node_status``: per-node reachability, one of ``reachable``, ``unreachable``
- ``metrics_status``: per-edge telemetry state, one of ``pending``, ``active``, ``error``
- ``error_reason``: present when ``metrics_status`` is ``error``; one of ``node_offline``, ``topic_stale``, ``no_data_source``

.. note::

**ros2_medkit extension:** Functions support resource collections beyond the SOVD spec.
``/data`` and ``/operations`` aggregate from hosted apps (per SOVD). Additionally,
``/configurations``, ``/faults``, ``/logs`` aggregate from hosts, and read-only
``/bulk-data`` is available. See :ref:`sovd-compliance` for details.
``/configurations``, ``/faults``, ``/logs`` aggregate from hosts, read-only
``/bulk-data`` is available, ``/cyclic-subscriptions`` is supported, and
the vendor resource ``/x-medkit-graph`` exposes a function-scoped graph snapshot.
See :ref:`sovd-compliance` for details.

Data Endpoints
--------------
Expand Down Expand Up @@ -1005,7 +1068,7 @@ Subscriptions are temporary - they do not survive server restart.
- ``faults`` - Fault list (resource path optional, e.g. ``/faults`` or ``/faults/fault_001``)
- ``configurations`` - Parameter values (resource path optional)
- ``logs`` - Application log entries from ``/rosout``
- ``x-*`` - Vendor extensions (e.g. ``x-medkit-metrics``)
- ``x-*`` - Vendor extensions (e.g. ``x-medkit-graph``)

**Interval values:**

Expand All @@ -1016,7 +1079,7 @@ Subscriptions are temporary - they do not survive server restart.
``POST /api/v1/{entity_type}/{entity_id}/cyclic-subscriptions``
Create a new cyclic subscription.

**Applies to:** ``/apps``, ``/components``
**Applies to:** ``/apps``, ``/components``, ``/functions``

**Request Body:**

Expand All @@ -1032,7 +1095,8 @@ Subscriptions are temporary - they do not survive server restart.
**Fields:**

- ``resource`` (string, required): Full SOVD resource URI to observe
(e.g. ``/api/v1/apps/{id}/data/{topic}``, ``/api/v1/apps/{id}/faults``)
(e.g. ``/api/v1/apps/{id}/data/{topic}``, ``/api/v1/apps/{id}/faults``,
``/api/v1/functions/{id}/x-medkit-graph``)
- ``protocol`` (string, optional): Transport protocol. Only ``"sse"`` supported. Default: ``"sse"``
- ``interval`` (string, required): One of ``fast``, ``normal``, ``slow``
- ``duration`` (integer, required): Subscription lifetime in seconds.
Expand Down Expand Up @@ -1371,7 +1435,7 @@ extends this to areas and functions where aggregation makes practical sense:
- \-
- yes
- yes
- \-
- yes
- apps, components

Other extensions beyond SOVD:
Expand Down
9 changes: 9 additions & 0 deletions src/ros2_medkit_gateway/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ endif()
# Find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(diagnostic_msgs REQUIRED)
find_package(std_msgs REQUIRED)
find_package(std_srvs REQUIRED)
find_package(sensor_msgs REQUIRED)
Expand Down Expand Up @@ -147,10 +148,12 @@ add_library(gateway_lib STATIC
src/plugins/plugin_context.cpp
src/plugins/plugin_loader.cpp
src/plugins/plugin_manager.cpp
src/plugins/graph_provider_plugin.cpp
)

medkit_target_dependencies(gateway_lib
rclcpp
diagnostic_msgs
std_msgs
std_srvs
rcl_interfaces
Expand Down Expand Up @@ -337,6 +340,11 @@ if(BUILD_TESTING)
ament_add_gtest(test_merge_pipeline test/test_merge_pipeline.cpp)
target_link_libraries(test_merge_pipeline gateway_lib)

# Add graph provider plugin tests
ament_add_gtest(test_graph_provider_plugin test/test_graph_provider_plugin.cpp)
target_link_libraries(test_graph_provider_plugin gateway_lib)
set_tests_properties(test_graph_provider_plugin PROPERTIES ENVIRONMENT "ROS_DOMAIN_ID=70")

# Add capability builder tests
ament_add_gtest(test_capability_builder test/test_capability_builder.cpp)
target_link_libraries(test_capability_builder gateway_lib)
Expand Down Expand Up @@ -516,6 +524,7 @@ if(BUILD_TESTING)
test_log_manager
test_log_handlers
test_merge_pipeline
test_graph_provider_plugin
test_runtime_linker
)
foreach(_target ${_test_targets})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2026 eclipse0922
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ros2_medkit_gateway/plugins/gateway_plugin.hpp"
#include "ros2_medkit_gateway/providers/introspection_provider.hpp"

#include <diagnostic_msgs/msg/diagnostic_array.hpp>
#include <rclcpp/rclcpp.hpp>

#include <deque>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>

namespace ros2_medkit_gateway {

class PluginContext;
class GatewayNode;

class GraphProviderPlugin : public GatewayPlugin, public IntrospectionProvider {
public:
struct TopicMetrics {
std::optional<double> frequency_hz;
std::optional<double> latency_ms;
double drop_rate_percent{0.0};
std::optional<double> expected_frequency_hz;
};

struct GraphBuildState {
std::unordered_map<std::string, TopicMetrics> topic_metrics;
std::unordered_set<std::string> stale_topics;
std::unordered_map<std::string, std::string> last_seen_by_app;
bool diagnostics_seen{false};
};

struct GraphBuildConfig {
double expected_frequency_hz_default{30.0};
double degraded_frequency_ratio{0.5};
double drop_rate_percent_threshold{5.0};
};

GraphProviderPlugin() = default;
~GraphProviderPlugin() override = default;

std::string name() const override;
void configure(const nlohmann::json & config) override;
void set_context(PluginContext & context) override;
void register_routes(httplib::Server & server, const std::string & api_prefix) override;
IntrospectionResult introspect(const IntrospectionInput & input) override;

static nlohmann::json build_graph_document(const std::string & function_id, const IntrospectionInput & input,
const GraphBuildState & state, const GraphBuildConfig & config,
const std::string & timestamp);

private:
struct ConfigOverrides {
std::unordered_map<std::string, GraphBuildConfig> by_function;
GraphBuildConfig defaults;
};

void subscribe_to_diagnostics();
void diagnostics_callback(const diagnostic_msgs::msg::DiagnosticArray::ConstSharedPtr & msg);
static std::optional<TopicMetrics> parse_topic_metrics(const diagnostic_msgs::msg::DiagnosticStatus & status);
static std::optional<double> parse_double(const std::string & value);
static std::string generate_fault_code(const std::string & diagnostic_name);
static std::string current_timestamp();
GraphBuildConfig resolve_config(const std::string & function_id) const;
std::optional<nlohmann::json> get_cached_or_built_graph(const std::string & function_id);
std::optional<nlohmann::json> build_graph_from_entity_cache(const std::string & function_id);
std::unordered_set<std::string> collect_stale_topics(const std::string & function_id,
const IntrospectionInput & input) const;
GraphBuildState build_state_snapshot(const std::string & function_id, const IntrospectionInput & input,
const std::string & timestamp, bool include_stale_topics = true);
void load_parameters();

PluginContext * ctx_{nullptr};
GatewayNode * gateway_node_{nullptr};

// Each mutex protects an independent cache/state bucket; no code path acquires more than one.
mutable std::mutex cache_mutex_;
std::unordered_map<std::string, nlohmann::json> graph_cache_;

mutable std::mutex metrics_mutex_;
std::unordered_map<std::string, TopicMetrics> topic_metrics_;
std::deque<std::string> topic_metrics_order_;
bool diagnostics_seen_{false}; // Guarded by metrics_mutex_.

mutable std::mutex status_mutex_;
std::unordered_map<std::string, std::string> last_seen_by_app_;

mutable std::mutex config_mutex_;
ConfigOverrides config_;

rclcpp::Subscription<diagnostic_msgs::msg::DiagnosticArray>::SharedPtr diagnostics_sub_;
};

} // namespace ros2_medkit_gateway
1 change: 1 addition & 0 deletions src/ros2_medkit_gateway/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<buildtool_depend>ament_cmake</buildtool_depend>

<depend>rclcpp</depend>
<depend>diagnostic_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>sensor_msgs</depend>
Expand Down
35 changes: 19 additions & 16 deletions src/ros2_medkit_gateway/src/gateway_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unordered_set>

#include "ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp"
#include "ros2_medkit_gateway/plugins/graph_provider_plugin.hpp"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -487,14 +488,15 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") {
// Initialize plugin manager
plugin_mgr_ = std::make_unique<PluginManager>();
plugin_mgr_->set_registries(*sampler_registry_, *transport_registry_);
plugin_mgr_->add_plugin(std::make_unique<GraphProviderPlugin>());
auto plugin_names = get_parameter("plugins").as_string_array();
plugin_names.erase(std::remove_if(plugin_names.begin(), plugin_names.end(),
[](const auto & item) {
return item.empty();
}),
plugin_names.end());
std::vector<PluginConfig> configs;
if (!plugin_names.empty()) {
std::vector<PluginConfig> configs;
// Plugin name validation: alphanumeric, underscore, hyphen only (max 256 chars)
auto is_valid_plugin_name = [](const std::string & name) -> bool {
if (name.empty() || name.size() > 256) {
Expand Down Expand Up @@ -529,21 +531,22 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") {
}
configs.push_back({pname, path, std::move(plugin_config)});
}
auto loaded = plugin_mgr_->load_plugins(configs);
plugin_mgr_->configure_plugins();
plugin_ctx_ = make_gateway_plugin_context(this, fault_mgr_.get());
plugin_mgr_->set_context(*plugin_ctx_);
RCLCPP_INFO(get_logger(), "Loaded %zu plugin(s)", loaded);

// Register IntrospectionProvider plugins as pipeline layers (hybrid mode only)
if (discovery_mgr_->get_mode() == DiscoveryMode::HYBRID) {
auto providers = plugin_mgr_->get_named_introspection_providers();
for (auto & [name, provider] : providers) {
discovery_mgr_->add_plugin_layer(name, provider);
}
if (!providers.empty()) {
discovery_mgr_->refresh_pipeline();
}
}

auto loaded = plugin_mgr_->load_plugins(configs);
plugin_mgr_->configure_plugins();
plugin_ctx_ = make_gateway_plugin_context(this, fault_mgr_.get());
plugin_mgr_->set_context(*plugin_ctx_);
RCLCPP_INFO(get_logger(), "Loaded %zu external plugin(s) and 1 built-in plugin", loaded);

// Register IntrospectionProvider plugins as pipeline layers (hybrid mode only)
if (discovery_mgr_->get_mode() == DiscoveryMode::HYBRID) {
auto providers = plugin_mgr_->get_named_introspection_providers();
for (auto & [name, provider] : providers) {
discovery_mgr_->add_plugin_layer(name, provider);
}
if (!providers.empty()) {
discovery_mgr_->refresh_pipeline();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ std::string CyclicSubscriptionHandlers::extract_entity_type(const httplib::Reque
return "apps";
case SovdEntityType::COMPONENT:
return "components";
case SovdEntityType::FUNCTION:
return "functions";
default:
RCLCPP_WARN(HandlerContext::logger(), "Unexpected entity type in cyclic subscription path: %s", req.path.c_str());
return "apps";
Expand All @@ -416,7 +418,7 @@ std::string CyclicSubscriptionHandlers::extract_entity_type(const httplib::Reque
tl::expected<ParsedResourceUri, std::string>
CyclicSubscriptionHandlers::parse_resource_uri(const std::string & resource) {
// Try entity-scoped format first: /api/v1/{entity_type}/{entity_id}/{collection}[/{resource_path}]
static const std::regex entity_regex(R"(^/api/v1/(apps|components)/([^/]+)/([^/]+)(/.*)?$)");
static const std::regex entity_regex(R"(^/api/v1/(apps|components|functions)/([^/]+)/([^/]+)(/.*)?$)");
std::smatch match;
if (std::regex_match(resource, match, entity_regex)) {
ParsedResourceUri parsed;
Expand Down Expand Up @@ -454,7 +456,7 @@ CyclicSubscriptionHandlers::parse_resource_uri(const std::string & resource) {
}

return tl::make_unexpected(
"Resource URI must match /api/v1/{apps|components}/{id}/{collection}[/{path}] "
"Resource URI must match /api/v1/{apps|components|functions}/{id}/{collection}[/{path}] "
"or /api/v1/updates/{id}/status");
}

Expand Down
12 changes: 9 additions & 3 deletions src/ros2_medkit_gateway/src/http/handlers/discovery_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,12 @@ void DiscoveryHandlers::handle_get_function(const httplib::Request & req, httpli
return;
}

auto discovery = ctx_.node()->get_discovery_manager();
auto func_opt = discovery->get_function(function_id);
const auto & cache = ctx_.node()->get_thread_safe_cache();
auto func_opt = cache.get_function(function_id);
if (!func_opt) {
auto discovery = ctx_.node()->get_discovery_manager();
func_opt = discovery->get_function(function_id);
}

if (!func_opt) {
HandlerContext::send_error(res, 404, ERR_ENTITY_NOT_FOUND, "Function not found", {{"function_id", function_id}});
Expand Down Expand Up @@ -1026,10 +1030,12 @@ void DiscoveryHandlers::handle_get_function(const httplib::Request & req, httpli
response["faults"] = base_uri + "/faults";
response["logs"] = base_uri + "/logs";
response["bulk-data"] = base_uri + "/bulk-data";
response["x-medkit-graph"] = base_uri + "/x-medkit-graph";
response["cyclic-subscriptions"] = base_uri + "/cyclic-subscriptions";

using Cap = CapabilityBuilder::Capability;
std::vector<Cap> caps = {Cap::HOSTS, Cap::DATA, Cap::OPERATIONS, Cap::CONFIGURATIONS,
Cap::FAULTS, Cap::LOGS, Cap::BULK_DATA};
Cap::FAULTS, Cap::LOGS, Cap::BULK_DATA, Cap::CYCLIC_SUBSCRIPTIONS};
response["capabilities"] = CapabilityBuilder::build_capabilities("functions", func.id, caps);
append_plugin_capabilities(response["capabilities"], "functions", func.id, SovdEntityType::FUNCTION, ctx_.node());

Expand Down
Loading
Loading