Skip to content
8 changes: 8 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,19 @@ pub async fn start_local_surfnet(
.await
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PluginInfo {
pub plugin_name: String,
pub uuid: String,
}

#[derive(Debug)]
pub enum PluginManagerCommand {
LoadConfig(Uuid, PluginConfig, Sender<String>),
UnloadPlugin(Uuid, Sender<Result<(), String>>),
ReloadPlugin(Uuid, PluginConfig, Sender<String>),
ListPlugins(Sender<Vec<PluginInfo>>),
}

#[cfg(test)]
Expand Down
67 changes: 25 additions & 42 deletions crates/core/src/rpc/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use txtx_addon_network_svm_types::subgraph::PluginConfig;
use uuid::Uuid;

use super::{RunloopContext, not_implemented_err, not_implemented_err_async};
use crate::{PluginManagerCommand, rpc::State};
use crate::{PluginInfo, PluginManagerCommand, rpc::State};

#[rpc]
pub trait AdminRpc {
Expand Down Expand Up @@ -126,7 +126,9 @@ pub trait AdminRpc {
/// loaded into the runtime. It can be useful for debugging or operational monitoring.
///
/// ## Returns
/// - `Vec<String>` — A list of plugin names currently active in the system.
/// - `Vec<PluginInfo>` — A list of plugin information objects, each containing:
/// - `plugin_name`: The name of the plugin (e.g., "surfpool-subgraph")
/// - `uuid`: The unique identifier of the plugin instance
///
/// ## Example Request (JSON-RPC)
/// ```json
Expand All @@ -142,7 +144,12 @@ pub trait AdminRpc {
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "result": ["tx_filter", "custom_logger"],
/// "result": [
/// {
/// "plugin_name": "surfpool-subgraph",
/// "uuid": "550e8400-e29b-41d4-a716-446655440000"
/// }
/// ],
/// "id": 103
/// }
/// ```
Expand All @@ -151,40 +158,7 @@ pub trait AdminRpc {
/// - Only plugins that have been successfully loaded will appear in this list.
/// - This method is read-only and safe to call frequently.
#[rpc(meta, name = "listPlugins")]
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;

/// Returns the address of the RPC server.
///
/// This RPC method retrieves the network address (IP and port) the RPC server is currently
/// listening on. It can be useful for service discovery or monitoring the server’s network status.
///
/// ## Returns
/// - `Option<SocketAddr>` — The network address of the RPC server, or `None` if no address is available.
///
/// ## Example Request (JSON-RPC)
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "id": 104,
/// "method": "rpcAddress",
/// "params": []
/// }
/// ```
///
/// ## Example Response
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "result": "127.0.0.1:8080",
/// "id": 104
/// }
/// ```
///
/// # Notes
/// - This method is useful for finding the address of a running RPC server, especially in dynamic environments.
/// - If the server is not configured or is running without network exposure, the result may be `None`.
#[rpc(meta, name = "rpcAddress")]
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<PluginInfo>>>;

/// Returns the system start time.
///
Expand Down Expand Up @@ -851,12 +825,21 @@ impl AdminRpc for SurfpoolAdminRpc {
Box::pin(async move { Ok(endpoint_url) })
}

fn list_plugins(&self, _meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
not_implemented_err_async("list_plugins")
}
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<PluginInfo>>> {
let Some(ctx) = meta else {
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
};

let (tx, rx) = crossbeam_channel::bounded(1);
let _ = ctx
.plugin_manager_commands_tx
.send(PluginManagerCommand::ListPlugins(tx));

let Ok(plugin_list) = rx.recv_timeout(Duration::from_secs(10)) else {
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
};

fn rpc_addr(&self, _meta: Self::Metadata) -> Result<Option<SocketAddr>> {
not_implemented_err("rpc_addr")
Box::pin(async move { Ok(plugin_list) })
}

fn start_time(&self, meta: Self::Metadata) -> Result<String> {
Expand Down
41 changes: 25 additions & 16 deletions crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,8 @@ fn start_geyser_runloop(

let mut surfpool_plugin_manager: Vec<Box<dyn GeyserPlugin>> = vec![];

// Map between each plugin's UUID to its position (index) in the surfpool_plugin_manager Vec.
// Allows for easier reload/unload
let mut plugin_uuid_map: HashMap<crate::Uuid, usize> = HashMap::new();
// Map between each plugin's UUID to its entry (index, plugin_name)
let mut plugin_map: HashMap<crate::Uuid, (usize, String)> = HashMap::new();

#[cfg(feature = "geyser_plugin")]
for plugin_config_path in plugin_config_paths.into_iter() {
Expand Down Expand Up @@ -481,7 +480,7 @@ fn start_geyser_runloop(
config: txtx_addon_network_svm_types::subgraph::PluginConfig,
notifier: crossbeam_channel::Sender<String>,
surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
plugin_uuid_map: &mut HashMap<uuid::Uuid, usize>,
plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
indexing_enabled: &mut bool|
-> Result<(), String> {
let _ = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(
Expand Down Expand Up @@ -518,7 +517,7 @@ fn start_geyser_runloop(
let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
let plugin_index = surfpool_plugin_manager.len();
surfpool_plugin_manager.push(plugin);
plugin_uuid_map.insert(uuid, plugin_index);
plugin_map.insert(uuid, (plugin_index, config.plugin_name.to_string()));

Ok(())
};
Expand All @@ -527,12 +526,13 @@ fn start_geyser_runloop(
#[cfg(feature = "subgraph")]
let unload_plugin_by_uuid = |uuid: uuid::Uuid,
surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
plugin_uuid_map: &mut HashMap<uuid::Uuid, usize>,
plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
indexing_enabled: &mut bool|
-> Result<(), String> {
let plugin_index = *plugin_uuid_map
let plugin_index = plugin_map
.get(&uuid)
.ok_or_else(|| format!("Plugin {} not found", uuid))?;
.ok_or_else(|| format!("Plugin {} not found", uuid))?
.0;

if plugin_index >= surfpool_plugin_manager.len() {
return Err(format!("Plugin index {} out of bounds", plugin_index));
Expand All @@ -546,12 +546,12 @@ fn start_geyser_runloop(

// Remove from tracking structures
surfpool_plugin_manager.remove(plugin_index);
plugin_uuid_map.remove(&uuid);
plugin_map.remove(&uuid);

// Adjust indices after removal
for (_, idx) in plugin_uuid_map.iter_mut() {
if *idx > plugin_index {
*idx -= 1;
for (index, _) in plugin_map.values_mut() {
if *index > plugin_index {
*index -= 1;
}
}

Expand Down Expand Up @@ -579,7 +579,7 @@ fn start_geyser_runloop(
}
#[cfg(feature = "subgraph")]
PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled) {
if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load plugin: {}", e)));
}
}
Expand All @@ -589,7 +589,7 @@ fn start_geyser_runloop(
}
#[cfg(feature = "subgraph")]
PluginManagerCommand::UnloadPlugin(uuid, notifier) => {
let result = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled);
let result = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled);
let _ = notifier.send(result);
}
#[cfg(not(feature = "subgraph"))]
Expand All @@ -599,18 +599,27 @@ fn start_geyser_runloop(
#[cfg(feature = "subgraph")]
PluginManagerCommand::ReloadPlugin(uuid, config, notifier) => {
// Unload the old plugin
if let Err(e) = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled) {
if let Err(e) = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
let _ = simnet_events_tx.try_send(SimnetEvent::error(format!("Failed to unload plugin during reload: {}", e)));
continue;
}

let _ = simnet_events_tx.try_send(SimnetEvent::info(format!("Unloaded plugin with UUID - {}", uuid)));

// Load the new plugin with the same UUID
if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled) {
if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
let _ = simnet_events_tx.try_send(SimnetEvent::error(format!("Failed to reload plugin: {}", e)));
}
}
PluginManagerCommand::ListPlugins(notifier) => {
let plugin_list: Vec<crate::PluginInfo> = plugin_map.iter().map(|(uuid, (_, plugin_name))| {
crate::PluginInfo {
plugin_name: plugin_name.clone(),
uuid: uuid.to_string(),
}
}).collect();
let _ = notifier.send(plugin_list);
}
}
},
Err(e) => {
Expand Down
1 change: 1 addition & 0 deletions surfpool-examples
Submodule surfpool-examples added at ec8097
Loading