Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
33c5665
* Refactor to remove Backend trait in favor of impl Service. Improve…
chrivers May 10, 2025
05fe785
+ Add new ServiceName struct for svc crate, to handle upcomming serv…
chrivers May 13, 2025
3284817
+ Add conversion functions for ServiceName
chrivers May 13, 2025
c56b712
* Replace all string-based service naming with ServiceName struct, t…
chrivers May 13, 2025
466ec79
+ Derive Debug, Clone, Copy for policy::{Retry, Policy}
chrivers May 13, 2025
d07f29f
* Rework "Service" trait to avoid associated const SIGNAL_STOP. Inst…
chrivers May 13, 2025
49494e3
+ Add SvcError::ServiceGeneration error variant, for service generators
chrivers May 17, 2025
a7aeded
- Remove obsolete specializations for Service impl on Future
chrivers May 17, 2025
12731df
* Formatting
chrivers May 17, 2025
cef3537
+ Add BoxDynService type alias, and impl Service for it through indi…
chrivers May 17, 2025
1f61898
+ Add ServiceTemplate trait, and ErrorAdapter impl, to later impleme…
chrivers May 17, 2025
dc7d954
+ Add Service::boxed() helper function, to wrap Service into Box<dyn…
chrivers May 17, 2025
644130b
+ Impl StandardService.boxed() helper function
chrivers May 17, 2025
387b44c
* Make ServiceName roundtrip serde in "service@instance" form as str…
chrivers May 17, 2025
36368c2
+ Impl constructors on ServiceName and ServiceId
chrivers May 17, 2025
8a5360f
+ Implement service template for z2m backend service
chrivers May 17, 2025
4e0a525
+ Add support for registering service templates in ServiceManager, a…
chrivers May 17, 2025
123b0dc
* Convert z2m services to use instancing
chrivers May 17, 2025
162bdcb
* Fix svc exports for "manager" feature
chrivers May 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions crates/bifrost-api/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::collections::BTreeMap;

use serde::{Deserialize, Serialize};
use svc::traits::ServiceState;
use uuid::Uuid;

use svc::serviceid::ServiceName;
use svc::traits::ServiceState;

use crate::Client;
use crate::error::BifrostResult;

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Service {
pub id: Uuid,
pub name: String,
pub name: ServiceName,
pub state: ServiceState,
}

Expand All @@ -24,12 +26,12 @@ impl Client {
self.get("service").await
}

pub async fn service_stop(&self, id: Uuid) -> BifrostResult<()> {
pub async fn service_stop(&self, id: Uuid) -> BifrostResult<Uuid> {
self.put(&format!("service/{id}"), ServiceState::Stopped)
.await
}

pub async fn service_start(&self, id: Uuid) -> BifrostResult<()> {
pub async fn service_start(&self, id: Uuid) -> BifrostResult<Uuid> {
self.put(&format!("service/{id}"), ServiceState::Running)
.await
}
Expand Down
13 changes: 11 additions & 2 deletions crates/svc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::error::Error;
use thiserror::Error;

use crate::manager::{ServiceEvent, SvmRequest};
use crate::serviceid::ServiceId;
use crate::serviceid::{ServiceId, ServiceName};
use crate::traits::ServiceState;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -43,13 +43,22 @@ pub enum SvcError {
ServiceNotFound(ServiceId),

#[error("Service {0} already exists")]
ServiceAlreadyExists(String),
ServiceAlreadyExists(ServiceName),

#[error("All services stopped")]
Shutdown,

#[error("Service has failed")]
ServiceFailed,

#[error("Templated service generation failed")]
ServiceGeneration(Box<dyn Error + Send>),
}

impl SvcError {
pub fn generation(err: impl Error + Send + 'static) -> Self {
Self::ServiceGeneration(Box::new(err))
}
}

#[derive(Error, Debug)]
Expand Down
2 changes: 2 additions & 0 deletions crates/svc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ pub mod manager;
pub mod rpc;
#[cfg(feature = "manager")]
pub mod runservice;
#[cfg(feature = "manager")]
pub mod template;
98 changes: 74 additions & 24 deletions crates/svc/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ use uuid::Uuid;
use crate::error::{RunSvcError, SvcError, SvcResult};
use crate::rpc::RpcRequest;
use crate::runservice::StandardService;
use crate::serviceid::{IntoServiceId, ServiceId};
use crate::serviceid::{IntoServiceId, ServiceId, ServiceName};
use crate::template::ServiceTemplate;
use crate::traits::{Service, ServiceRunner, ServiceState};

#[derive(Debug)]
pub struct ServiceInstance {
tx: watch::Sender<ServiceState>,
name: String,
name: ServiceName,
state: ServiceState,
abort_handle: AbortHandle,
}
Expand Down Expand Up @@ -60,13 +61,14 @@ impl ServiceEvent {

/// A request to a [`ServiceManager`]
pub enum SvmRequest {
Stop(RpcRequest<ServiceId, SvcResult<()>>),
Start(RpcRequest<ServiceId, SvcResult<()>>),
Stop(RpcRequest<ServiceId, SvcResult<Uuid>>),
Start(RpcRequest<ServiceId, SvcResult<Uuid>>),
Status(RpcRequest<ServiceId, SvcResult<ServiceState>>),
List(RpcRequest<(), Vec<(Uuid, String)>>),
List(RpcRequest<(), Vec<(Uuid, ServiceName)>>),
Resolve(RpcRequest<ServiceId, SvcResult<Uuid>>),
LookupName(RpcRequest<ServiceId, SvcResult<String>>),
LookupName(RpcRequest<ServiceId, SvcResult<ServiceName>>),
Register(RpcRequest<(String, ServiceFunc), SvcResult<Uuid>>),
RegisterTemplate(RpcRequest<(String, Box<dyn ServiceTemplate>), SvcResult<()>>),
Subscribe(RpcRequest<mpsc::UnboundedSender<ServiceEvent>, SvcResult<Uuid>>),
Shutdown(RpcRequest<(), ()>),
}
Expand Down Expand Up @@ -128,19 +130,29 @@ impl SvmClient {
.await?
}

pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult<()> {
pub async fn register_template(
&mut self,
name: impl AsRef<str>,
generator: impl ServiceTemplate + 'static,
) -> SvcResult<()> {
let name = name.as_ref().to_string();
self.rpc(SvmRequest::RegisterTemplate, (name, Box::new(generator)))
.await?
}

pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
self.rpc(SvmRequest::Start, id.service_id()).await?
}

pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult<()> {
pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
self.rpc(SvmRequest::Stop, id.service_id()).await?
}

pub async fn resolve(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
self.rpc(SvmRequest::Resolve, id.service_id()).await?
}

pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult<String> {
pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult<ServiceName> {
self.rpc(SvmRequest::LookupName, id.service_id()).await?
}

Expand Down Expand Up @@ -197,7 +209,7 @@ impl SvmClient {
self.rpc(SvmRequest::Status, id.service_id()).await?
}

pub async fn list(&mut self) -> SvcResult<Vec<(Uuid, String)>> {
pub async fn list(&mut self) -> SvcResult<Vec<(Uuid, ServiceName)>> {
self.rpc(SvmRequest::List, ()).await
}

Expand All @@ -214,6 +226,10 @@ impl Debug for SvmRequest {
Self::Status(arg0) => f.debug_tuple("Status").field(arg0).finish(),
Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(),
Self::Register(_arg0) => f.debug_tuple("Register").field(&"<service>").finish(),
Self::RegisterTemplate(_arg0) => f
.debug_tuple("RegisterTemplate")
.field(&"<service>")
.finish(),
Self::Resolve(arg0) => f.debug_tuple("Resolve").field(arg0).finish(),
Self::LookupName(arg0) => f.debug_tuple("ResolveName").field(arg0).finish(),
Self::Subscribe(_arg0) => f.debug_tuple("Subscribe").finish(),
Expand All @@ -229,8 +245,9 @@ pub struct ServiceManager {
service_tx: mpsc::UnboundedSender<ServiceEvent>,
subscribers: BTreeMap<Uuid, mpsc::UnboundedSender<ServiceEvent>>,
svcs: BTreeMap<Uuid, ServiceInstance>,
names: BTreeMap<String, Uuid>,
names: BTreeMap<ServiceName, Uuid>,
tasks: JoinSet<Result<(), RunSvcError>>,
templates: BTreeMap<String, Box<dyn ServiceTemplate>>,
shutdown: bool,
}

Expand All @@ -254,6 +271,7 @@ impl ServiceManager {
svcs: BTreeMap::new(),
names: BTreeMap::new(),
tasks: JoinSet::new(),
templates: BTreeMap::new(),
shutdown: false,
}
}
Expand Down Expand Up @@ -284,8 +302,7 @@ impl ServiceManager {
self.control_tx.clone()
}

fn register(&mut self, name: &str, svc: ServiceFunc) -> SvcResult<Uuid> {
let name = name.to_string();
fn register(&mut self, name: ServiceName, svc: ServiceFunc) -> SvcResult<Uuid> {
if self.names.contains_key(&name) {
return Err(SvcError::ServiceAlreadyExists(name));
}
Expand All @@ -297,7 +314,7 @@ impl ServiceManager {

let rec = ServiceInstance {
tx,
name: name.to_string(),
name: name.clone(),
state: ServiceState::Registered,
abort_handle,
};
Expand All @@ -317,7 +334,7 @@ impl ServiceManager {
match &id {
ServiceId::Name(name) => self
.names
.get(name.as_str())
.get(name)
.ok_or_else(|| SvcError::ServiceNotFound(id))
.copied(),
ServiceId::Id(uuid) => {
Expand Down Expand Up @@ -351,23 +368,49 @@ impl ServiceManager {
Ok(&self.svcs[&id])
}

fn start(&self, id: impl IntoServiceId) -> SvcResult<()> {
self.get(&id).and_then(|svc| {
fn start(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
let id = id.service_id();

// if the service is known, attempt to start it
if let Ok(svc) = self.get(&id) {
log::debug!("Starting service: {id} {}", &svc.name);
Ok(svc.tx.send(ServiceState::Running)?)
})
svc.tx.send(ServiceState::Running)?;
return self.resolve(&id);
}

// ..else, check if it's a named instance
let ServiceId::Name(svc_name) = &id else {
return Err(SvcError::ServiceNotFound(id));
};

let Some(inst) = svc_name.instance() else {
return Err(SvcError::ServiceNotFound(id));
};

let Some(tmpl) = &self.templates.get(svc_name.name()) else {
return Err(SvcError::ServiceNotFound(id));
};

let inner = tmpl.generate(inst.to_string())?;
let svc = StandardService::new(svc_name.name(), inner);

let uuid = self.register(svc_name.clone(), svc.boxed())?;

Ok(uuid)
}

fn stop(&self, id: impl IntoServiceId) -> SvcResult<()> {
fn stop(&self, id: impl IntoServiceId) -> SvcResult<Uuid> {
let id = self.resolve(id)?;

if self.svcs[&id].state == ServiceState::Stopped {
return Ok(());
return Ok(id);
}

log::debug!("Stopping service: {id} {}", self.svcs[&id].name);
self.get(id)
.and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?))
.and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?))?;

Ok(id)
}

fn notify_subscribers(&mut self, event: ServiceEvent) {
Expand Down Expand Up @@ -412,12 +455,19 @@ impl ServiceManager {
let mut res = vec![];

for (name, id) in &self.names {
res.push((*id, name.to_string()));
res.push((*id, name.clone()));
}
res
}),

SvmRequest::Register(rpc) => rpc.respond(|(name, svc)| self.register(&name, svc)),
SvmRequest::Register(rpc) => {
rpc.respond(|(name, svc)| self.register(ServiceName::from(name), svc));
}

SvmRequest::RegisterTemplate(rpc) => rpc.respond(|(name, tmpl)| {
self.templates.insert(name, tmpl);
Ok(())
}),

SvmRequest::Resolve(rpc) => rpc.respond(|id| self.resolve(&id)),

Expand Down
2 changes: 2 additions & 0 deletions crates/svc/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use std::time::Duration;
#[cfg(feature = "manager")]
use tokio::time::sleep;

#[derive(Debug, Clone, Copy)]
pub enum Retry {
No,
Limit(u32),
Forever,
}

#[derive(Debug, Clone, Copy)]
pub struct Policy {
pub retry: Retry,
pub delay: Option<Duration>,
Expand Down
42 changes: 26 additions & 16 deletions crates/svc/src/runservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use tokio::time::sleep;
use uuid::Uuid;

use crate::error::RunSvcError;
use crate::manager::ServiceEvent;
use crate::manager::{ServiceEvent, ServiceFunc};
use crate::policy::{Policy, Retry};
use crate::traits::{Service, ServiceRunner, ServiceState};
use crate::traits::{Service, ServiceRunner, ServiceState, StopResult};

#[allow(clippy::struct_field_names)]
struct State {
Expand Down Expand Up @@ -101,6 +101,12 @@ impl<S: Service> StandardService<S> {
}
}

impl<S: Service + 'static> StandardService<S> {
pub fn boxed(self) -> ServiceFunc {
Box::new(|a, b, c| self.run(a, b, c))
}
}

#[allow(clippy::too_many_lines)]
#[async_trait]
impl<S: Service> ServiceRunner for StandardService<S> {
Expand Down Expand Up @@ -192,23 +198,27 @@ impl<S: Service> ServiceRunner for StandardService<S> {
}
},
_ = rx.changed() => if *rx.borrow() == ServiceState::Stopped {
if S::SIGNAL_STOP {
log::trace!(target:target, "Service state change requested (graceful)");
svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?;
tokio::select! {
res = svc.run() => {
log::trace!(target:target, "Service finished running within timeout: {res:?}");
},
() = sleep(Duration::from_secs(1)) => {
log::warn!("timeout");
log::trace!(target:target, "Stopping service");
let stop = svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?;
match stop {
StopResult::Delivered => {
log::trace!(target:target, "Service state change requested (graceful)");
tokio::select! {
res = svc.run() => {
log::trace!(target:target, "Service finished running within timeout: {res:?}");
},
() = sleep(Duration::from_secs(1)) => {
log::warn!("timeout");
}
}
}
state.set(ServiceState::Stopping)?;
} else {
log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow());
if *rx.borrow_and_update() == ServiceState::Stopped {
state.set(ServiceState::Stopping)?;
}
StopResult::NotSupported => {
log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow());
if *rx.borrow_and_update() == ServiceState::Stopped {
state.set(ServiceState::Stopping)?;
}
}
}
}
}
Expand Down
Loading