diff --git a/crates/bifrost-api/src/service.rs b/crates/bifrost-api/src/service.rs index d17ecf25..76025cc1 100644 --- a/crates/bifrost-api/src/service.rs +++ b/crates/bifrost-api/src/service.rs @@ -1,16 +1,18 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; -use svc::traits::ServiceState; use uuid::Uuid; +use svc::serviceid::ServiceName; +use svc::traits::ServiceState; + use crate::Client; use crate::error::BifrostResult; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct Service { pub id: Uuid, - pub name: String, + pub name: ServiceName, pub state: ServiceState, } @@ -24,12 +26,12 @@ impl Client { self.get("service").await } - pub async fn service_stop(&self, id: Uuid) -> BifrostResult<()> { + pub async fn service_stop(&self, id: Uuid) -> BifrostResult { self.put(&format!("service/{id}"), ServiceState::Stopped) .await } - pub async fn service_start(&self, id: Uuid) -> BifrostResult<()> { + pub async fn service_start(&self, id: Uuid) -> BifrostResult { self.put(&format!("service/{id}"), ServiceState::Running) .await } diff --git a/crates/svc/src/error.rs b/crates/svc/src/error.rs index 6aeac961..aabe6990 100644 --- a/crates/svc/src/error.rs +++ b/crates/svc/src/error.rs @@ -3,7 +3,7 @@ use std::error::Error; use thiserror::Error; use crate::manager::{ServiceEvent, SvmRequest}; -use crate::serviceid::ServiceId; +use crate::serviceid::{ServiceId, ServiceName}; use crate::traits::ServiceState; #[derive(Error, Debug)] @@ -43,13 +43,22 @@ pub enum SvcError { ServiceNotFound(ServiceId), #[error("Service {0} already exists")] - ServiceAlreadyExists(String), + ServiceAlreadyExists(ServiceName), #[error("All services stopped")] Shutdown, #[error("Service has failed")] ServiceFailed, + + #[error("Templated service generation failed")] + ServiceGeneration(Box), +} + +impl SvcError { + pub fn generation(err: impl Error + Send + 'static) -> Self { + Self::ServiceGeneration(Box::new(err)) + } } #[derive(Error, Debug)] diff --git a/crates/svc/src/lib.rs b/crates/svc/src/lib.rs index 6b996099..f7ba6a05 100644 --- a/crates/svc/src/lib.rs +++ b/crates/svc/src/lib.rs @@ -10,3 +10,5 @@ pub mod manager; pub mod rpc; #[cfg(feature = "manager")] pub mod runservice; +#[cfg(feature = "manager")] +pub mod template; diff --git a/crates/svc/src/manager.rs b/crates/svc/src/manager.rs index 67b9d030..5b31cde2 100644 --- a/crates/svc/src/manager.rs +++ b/crates/svc/src/manager.rs @@ -15,13 +15,14 @@ use uuid::Uuid; use crate::error::{RunSvcError, SvcError, SvcResult}; use crate::rpc::RpcRequest; use crate::runservice::StandardService; -use crate::serviceid::{IntoServiceId, ServiceId}; +use crate::serviceid::{IntoServiceId, ServiceId, ServiceName}; +use crate::template::ServiceTemplate; use crate::traits::{Service, ServiceRunner, ServiceState}; #[derive(Debug)] pub struct ServiceInstance { tx: watch::Sender, - name: String, + name: ServiceName, state: ServiceState, abort_handle: AbortHandle, } @@ -60,13 +61,14 @@ impl ServiceEvent { /// A request to a [`ServiceManager`] pub enum SvmRequest { - Stop(RpcRequest>), - Start(RpcRequest>), + Stop(RpcRequest>), + Start(RpcRequest>), Status(RpcRequest>), - List(RpcRequest<(), Vec<(Uuid, String)>>), + List(RpcRequest<(), Vec<(Uuid, ServiceName)>>), Resolve(RpcRequest>), - LookupName(RpcRequest>), + LookupName(RpcRequest>), Register(RpcRequest<(String, ServiceFunc), SvcResult>), + RegisterTemplate(RpcRequest<(String, Box), SvcResult<()>>), Subscribe(RpcRequest, SvcResult>), Shutdown(RpcRequest<(), ()>), } @@ -128,11 +130,21 @@ impl SvmClient { .await? } - pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult<()> { + pub async fn register_template( + &mut self, + name: impl AsRef, + generator: impl ServiceTemplate + 'static, + ) -> SvcResult<()> { + let name = name.as_ref().to_string(); + self.rpc(SvmRequest::RegisterTemplate, (name, Box::new(generator))) + .await? + } + + pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult { self.rpc(SvmRequest::Start, id.service_id()).await? } - pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult<()> { + pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult { self.rpc(SvmRequest::Stop, id.service_id()).await? } @@ -140,7 +152,7 @@ impl SvmClient { self.rpc(SvmRequest::Resolve, id.service_id()).await? } - pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult { + pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult { self.rpc(SvmRequest::LookupName, id.service_id()).await? } @@ -197,7 +209,7 @@ impl SvmClient { self.rpc(SvmRequest::Status, id.service_id()).await? } - pub async fn list(&mut self) -> SvcResult> { + pub async fn list(&mut self) -> SvcResult> { self.rpc(SvmRequest::List, ()).await } @@ -214,6 +226,10 @@ impl Debug for SvmRequest { Self::Status(arg0) => f.debug_tuple("Status").field(arg0).finish(), Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(), Self::Register(_arg0) => f.debug_tuple("Register").field(&"").finish(), + Self::RegisterTemplate(_arg0) => f + .debug_tuple("RegisterTemplate") + .field(&"") + .finish(), Self::Resolve(arg0) => f.debug_tuple("Resolve").field(arg0).finish(), Self::LookupName(arg0) => f.debug_tuple("ResolveName").field(arg0).finish(), Self::Subscribe(_arg0) => f.debug_tuple("Subscribe").finish(), @@ -229,8 +245,9 @@ pub struct ServiceManager { service_tx: mpsc::UnboundedSender, subscribers: BTreeMap>, svcs: BTreeMap, - names: BTreeMap, + names: BTreeMap, tasks: JoinSet>, + templates: BTreeMap>, shutdown: bool, } @@ -254,6 +271,7 @@ impl ServiceManager { svcs: BTreeMap::new(), names: BTreeMap::new(), tasks: JoinSet::new(), + templates: BTreeMap::new(), shutdown: false, } } @@ -284,8 +302,7 @@ impl ServiceManager { self.control_tx.clone() } - fn register(&mut self, name: &str, svc: ServiceFunc) -> SvcResult { - let name = name.to_string(); + fn register(&mut self, name: ServiceName, svc: ServiceFunc) -> SvcResult { if self.names.contains_key(&name) { return Err(SvcError::ServiceAlreadyExists(name)); } @@ -297,7 +314,7 @@ impl ServiceManager { let rec = ServiceInstance { tx, - name: name.to_string(), + name: name.clone(), state: ServiceState::Registered, abort_handle, }; @@ -317,7 +334,7 @@ impl ServiceManager { match &id { ServiceId::Name(name) => self .names - .get(name.as_str()) + .get(name) .ok_or_else(|| SvcError::ServiceNotFound(id)) .copied(), ServiceId::Id(uuid) => { @@ -351,23 +368,49 @@ impl ServiceManager { Ok(&self.svcs[&id]) } - fn start(&self, id: impl IntoServiceId) -> SvcResult<()> { - self.get(&id).and_then(|svc| { + fn start(&mut self, id: impl IntoServiceId) -> SvcResult { + let id = id.service_id(); + + // if the service is known, attempt to start it + if let Ok(svc) = self.get(&id) { log::debug!("Starting service: {id} {}", &svc.name); - Ok(svc.tx.send(ServiceState::Running)?) - }) + svc.tx.send(ServiceState::Running)?; + return self.resolve(&id); + } + + // ..else, check if it's a named instance + let ServiceId::Name(svc_name) = &id else { + return Err(SvcError::ServiceNotFound(id)); + }; + + let Some(inst) = svc_name.instance() else { + return Err(SvcError::ServiceNotFound(id)); + }; + + let Some(tmpl) = &self.templates.get(svc_name.name()) else { + return Err(SvcError::ServiceNotFound(id)); + }; + + let inner = tmpl.generate(inst.to_string())?; + let svc = StandardService::new(svc_name.name(), inner); + + let uuid = self.register(svc_name.clone(), svc.boxed())?; + + Ok(uuid) } - fn stop(&self, id: impl IntoServiceId) -> SvcResult<()> { + fn stop(&self, id: impl IntoServiceId) -> SvcResult { let id = self.resolve(id)?; if self.svcs[&id].state == ServiceState::Stopped { - return Ok(()); + return Ok(id); } log::debug!("Stopping service: {id} {}", self.svcs[&id].name); self.get(id) - .and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?)) + .and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?))?; + + Ok(id) } fn notify_subscribers(&mut self, event: ServiceEvent) { @@ -412,12 +455,19 @@ impl ServiceManager { let mut res = vec![]; for (name, id) in &self.names { - res.push((*id, name.to_string())); + res.push((*id, name.clone())); } res }), - SvmRequest::Register(rpc) => rpc.respond(|(name, svc)| self.register(&name, svc)), + SvmRequest::Register(rpc) => { + rpc.respond(|(name, svc)| self.register(ServiceName::from(name), svc)); + } + + SvmRequest::RegisterTemplate(rpc) => rpc.respond(|(name, tmpl)| { + self.templates.insert(name, tmpl); + Ok(()) + }), SvmRequest::Resolve(rpc) => rpc.respond(|id| self.resolve(&id)), diff --git a/crates/svc/src/policy.rs b/crates/svc/src/policy.rs index 6bd944c6..76f72cbb 100644 --- a/crates/svc/src/policy.rs +++ b/crates/svc/src/policy.rs @@ -4,12 +4,14 @@ use std::time::Duration; #[cfg(feature = "manager")] use tokio::time::sleep; +#[derive(Debug, Clone, Copy)] pub enum Retry { No, Limit(u32), Forever, } +#[derive(Debug, Clone, Copy)] pub struct Policy { pub retry: Retry, pub delay: Option, diff --git a/crates/svc/src/runservice.rs b/crates/svc/src/runservice.rs index fe1e877a..bf2058fd 100644 --- a/crates/svc/src/runservice.rs +++ b/crates/svc/src/runservice.rs @@ -5,9 +5,9 @@ use tokio::time::sleep; use uuid::Uuid; use crate::error::RunSvcError; -use crate::manager::ServiceEvent; +use crate::manager::{ServiceEvent, ServiceFunc}; use crate::policy::{Policy, Retry}; -use crate::traits::{Service, ServiceRunner, ServiceState}; +use crate::traits::{Service, ServiceRunner, ServiceState, StopResult}; #[allow(clippy::struct_field_names)] struct State { @@ -101,6 +101,12 @@ impl StandardService { } } +impl StandardService { + pub fn boxed(self) -> ServiceFunc { + Box::new(|a, b, c| self.run(a, b, c)) + } +} + #[allow(clippy::too_many_lines)] #[async_trait] impl ServiceRunner for StandardService { @@ -192,23 +198,27 @@ impl ServiceRunner for StandardService { } }, _ = rx.changed() => if *rx.borrow() == ServiceState::Stopped { - if S::SIGNAL_STOP { - log::trace!(target:target, "Service state change requested (graceful)"); - svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?; - tokio::select! { - res = svc.run() => { - log::trace!(target:target, "Service finished running within timeout: {res:?}"); - }, - () = sleep(Duration::from_secs(1)) => { - log::warn!("timeout"); + log::trace!(target:target, "Stopping service"); + let stop = svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?; + match stop { + StopResult::Delivered => { + log::trace!(target:target, "Service state change requested (graceful)"); + tokio::select! { + res = svc.run() => { + log::trace!(target:target, "Service finished running within timeout: {res:?}"); + }, + () = sleep(Duration::from_secs(1)) => { + log::warn!("timeout"); + } } - } - state.set(ServiceState::Stopping)?; - } else { - log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow()); - if *rx.borrow_and_update() == ServiceState::Stopped { state.set(ServiceState::Stopping)?; } + StopResult::NotSupported => { + log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow()); + if *rx.borrow_and_update() == ServiceState::Stopped { + state.set(ServiceState::Stopping)?; + } + } } } } diff --git a/crates/svc/src/serviceid.rs b/crates/svc/src/serviceid.rs index 636324ef..0efe9fc5 100644 --- a/crates/svc/src/serviceid.rs +++ b/crates/svc/src/serviceid.rs @@ -1,13 +1,70 @@ use std::fmt::{Debug, Display}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(from = "String", into = "String")] +pub struct ServiceName { + name: String, + instance: Option, +} + +impl From for String { + fn from(value: ServiceName) -> Self { + value.to_string() + } +} + +impl ServiceName { + #[must_use] + pub const fn new(name: String, instance: Option) -> Self { + Self { name, instance } + } + + // suppress clippy false-positive + #[allow(clippy::missing_const_for_fn)] + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + + #[must_use] + pub fn instance(&self) -> Option<&str> { + self.instance.as_deref() + } +} + +impl Display for ServiceName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self { + name, + instance: None, + } => write!(f, "{name}"), + Self { + name, + instance: Some(instance), + } => write!(f, "{name}@{instance}"), + } + } +} + #[derive(Debug, Clone)] pub enum ServiceId { - Name(String), + Name(ServiceName), Id(Uuid), } +impl ServiceId { + pub fn instance(name: impl Into, instance: impl Into) -> Self { + Self::Name(ServiceName { + name: name.into(), + instance: Some(instance.into()), + }) + } +} + impl Display for ServiceId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -41,13 +98,13 @@ impl IntoServiceId for Uuid { impl IntoServiceId for String { fn service_id(self) -> ServiceId { - ServiceId::Name(self) + ServiceId::Name(ServiceName::from(self)) } } impl IntoServiceId for &str { fn service_id(self) -> ServiceId { - ServiceId::Name(self.to_string()) + ServiceId::Name(ServiceName::from(self)) } } @@ -59,6 +116,38 @@ impl From for ServiceId { impl From for ServiceId { fn from(value: String) -> Self { - Self::Name(value) + value.service_id() + } +} + +impl From for ServiceName { + fn from(value: String) -> Self { + if let Some((name, instance)) = value.split_once('@') { + Self { + name: name.to_string(), + instance: Some(instance.to_string()), + } + } else { + Self { + name: value, + instance: None, + } + } + } +} + +impl From<&str> for ServiceName { + fn from(value: &str) -> Self { + if let Some((name, instance)) = value.split_once('@') { + Self { + name: name.to_string(), + instance: Some(instance.to_string()), + } + } else { + Self { + name: value.to_string(), + instance: None, + } + } } } diff --git a/crates/svc/src/template.rs b/crates/svc/src/template.rs new file mode 100644 index 00000000..04393b92 --- /dev/null +++ b/crates/svc/src/template.rs @@ -0,0 +1,70 @@ +use async_trait::async_trait; + +#[cfg(feature = "manager")] +use crate::error::RunSvcError; +use crate::error::SvcError; +use crate::traits::{BoxDynService, Service, StopResult}; + +#[cfg(feature = "manager")] +pub trait ServiceTemplate: Send { + fn generate(&self, instance: String) -> Result; +} + +pub struct ErrorAdapter { + svc: S, +} + +impl ErrorAdapter { + pub const fn new(svc: S) -> Self { + Self { svc } + } +} + +#[async_trait] +impl Service for ErrorAdapter { + type Error = RunSvcError; + + async fn configure(&mut self) -> Result<(), Self::Error> { + self.svc + .configure() + .await + .map_err(|err| RunSvcError::ServiceError(Box::new(err))) + } + + async fn start(&mut self) -> Result<(), Self::Error> { + self.svc + .start() + .await + .map_err(|err| RunSvcError::ServiceError(Box::new(err))) + } + + async fn run(&mut self) -> Result<(), Self::Error> { + self.svc + .run() + .await + .map_err(|err| RunSvcError::ServiceError(Box::new(err))) + } + + async fn stop(&mut self) -> Result<(), Self::Error> { + self.svc + .stop() + .await + .map_err(|err| RunSvcError::ServiceError(Box::new(err))) + } + + async fn signal_stop(&mut self) -> Result { + self.svc + .signal_stop() + .await + .map_err(|err| RunSvcError::ServiceError(Box::new(err))) + } +} + +impl ServiceTemplate for F +where + F: Fn(String) -> Result + Send, +{ + fn generate(&self, instance: String) -> Result { + self(instance) + } +} diff --git a/crates/svc/src/traits.rs b/crates/svc/src/traits.rs index 6f1bd464..fa4ca583 100644 --- a/crates/svc/src/traits.rs +++ b/crates/svc/src/traits.rs @@ -1,6 +1,7 @@ use std::error::Error; use async_trait::async_trait; +use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; #[cfg(feature = "manager")] @@ -8,6 +9,8 @@ use crate::error::RunSvcError; #[cfg(feature = "manager")] use crate::manager::ServiceEvent; #[cfg(feature = "manager")] +use crate::template::ErrorAdapter; +#[cfg(feature = "manager")] use std::future::Future; #[cfg(feature = "manager")] use tokio::sync::{mpsc, watch}; @@ -69,10 +72,14 @@ pub enum ServiceState { Failed, } +pub enum StopResult { + Delivered, + NotSupported, +} + #[async_trait] pub trait Service: Send { type Error: Error + Send + 'static; - const SIGNAL_STOP: bool = false; async fn configure(&mut self) -> Result<(), Self::Error> { Ok(()) @@ -88,8 +95,44 @@ pub trait Service: Send { Ok(()) } - async fn signal_stop(&mut self) -> Result<(), Self::Error> { - Ok(()) + async fn signal_stop(&mut self) -> Result { + Ok(StopResult::NotSupported) + } + + #[cfg(feature = "manager")] + fn boxed(self) -> BoxDynService + where + Self: Sized + Unpin + 'static, + { + Box::new(ErrorAdapter::new(self)) as BoxDynService + } +} + +#[cfg(feature = "manager")] +pub type BoxDynService = Box + Unpin + 'static>; + +#[cfg(feature = "manager")] +impl Service for BoxDynService { + type Error = RunSvcError; + + fn run<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> { + (**self).run() + } + + fn configure<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> { + (**self).configure() + } + + fn start<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> { + (**self).start() + } + + fn stop<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> { + (**self).stop() + } + + fn signal_stop<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result> { + (**self).signal_stop() } } @@ -106,25 +149,14 @@ pub trait ServiceRunner { #[cfg(feature = "manager")] #[async_trait] -impl Service for F +impl Service for F where + E: Error + Send + 'static, F: Future> + Send + Unpin, { type Error = E; - async fn configure(&mut self) -> Result<(), E> { - Ok(()) - } - - async fn start(&mut self) -> Result<(), E> { - Ok(()) - } - async fn run(&mut self) -> Result<(), E> { self.await } - - async fn stop(&mut self) -> Result<(), E> { - Ok(()) - } } diff --git a/src/backend/mod.rs b/src/backend/mod.rs index efe89146..5b73b6ee 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -1,15 +1 @@ pub mod z2m; - -use std::sync::Arc; - -use async_trait::async_trait; -use tokio::sync::broadcast::Receiver; - -use bifrost_api::backend::BackendRequest; - -use crate::error::ApiResult; - -#[async_trait] -pub trait Backend { - async fn run_forever(self, chan: Receiver>) -> ApiResult<()>; -} diff --git a/src/backend/z2m/mod.rs b/src/backend/z2m/mod.rs index 26c08637..cefd96db 100644 --- a/src/backend/z2m/mod.rs +++ b/src/backend/z2m/mod.rs @@ -13,17 +13,22 @@ use std::time::Duration; use async_trait::async_trait; use futures::StreamExt; use native_tls::TlsConnector; +use svc::error::SvcError; +use svc::template::ServiceTemplate; +use svc::traits::{BoxDynService, Service}; +use thiserror::Error; +use tokio::net::TcpStream; use tokio::select; use tokio::sync::broadcast::Receiver; use tokio::sync::{Mutex, mpsc}; -use tokio::time::sleep; -use tokio_tungstenite::{Connector, connect_async_tls_with_config}; +use tokio_tungstenite::{ + Connector, MaybeTlsStream, WebSocketStream, connect_async_tls_with_config, +}; use bifrost_api::backend::BackendRequest; use hue::api::ResourceLink; use z2m::update::DeviceUpdate; -use crate::backend::Backend; use crate::backend::z2m::entertainment::EntStream; use crate::backend::z2m::learn::SceneLearn; use crate::backend::z2m::websocket::Z2mWebSocket; @@ -31,6 +36,37 @@ use crate::config::{AppConfig, Z2mServer}; use crate::error::{ApiError, ApiResult}; use crate::model::throttle::Throttle; use crate::resource::Resources; +use crate::server::appstate::AppState; + +#[derive(Error, Debug)] +pub enum TemplateError { + #[error("No config found for z2m server {0:?}")] + NotFound(String), +} + +pub struct Z2mServiceTemplate { + state: AppState, +} + +impl Z2mServiceTemplate { + #[must_use] + pub const fn new(state: AppState) -> Self { + Self { state } + } +} + +impl ServiceTemplate for Z2mServiceTemplate { + fn generate(&self, name: String) -> Result { + let config = self.state.config(); + let Some(server) = config.z2m.servers.get(&name) else { + return Err(SvcError::generation(TemplateError::NotFound(name))); + }; + let svc = Z2mBackend::new(name, server.clone(), config, self.state.res.clone()) + .map_err(SvcError::generation)?; + + Ok(svc.boxed()) + } +} pub struct Z2mBackend { name: String, @@ -46,6 +82,7 @@ pub struct Z2mBackend { counter: u32, fps: u32, throttle: Throttle, + socket: Option>>, // for sending delayed messages over the websocket message_rx: mpsc::UnboundedReceiver<(String, DeviceUpdate)>, @@ -86,6 +123,7 @@ impl Z2mBackend { fps, message_rx, message_tx, + socket: None, counter: 0, }) } @@ -119,8 +157,10 @@ impl Z2mBackend { } #[async_trait] -impl Backend for Z2mBackend { - async fn run_forever(mut self, mut chan: Receiver>) -> ApiResult<()> { +impl Service for Z2mBackend { + type Error = ApiError; + + async fn start(&mut self) -> ApiResult<()> { // let's not include auth tokens in log output let sanitized_url = self.server.get_sanitized_url(); let url = self.server.get_url(); @@ -154,22 +194,33 @@ impl Backend for Z2mBackend { None }; - loop { - log::info!("[{}] Connecting to {}", self.name, &sanitized_url); - match connect_async_tls_with_config(url.as_str(), None, false, connector.clone()).await - { - Ok((socket, _)) => { - let z2m_socket = Z2mWebSocket::new(self.name.clone(), socket); - let res = self.event_loop(&mut chan, z2m_socket).await; - if let Err(err) = res { - log::error!("[{}] Event loop broke: {err}", self.name); - } - } - Err(err) => { - log::error!("[{}] Connect failed: {err:?}", self.name); - } + log::info!("[{}] Connecting to {}", self.name, &sanitized_url); + match connect_async_tls_with_config(url.as_str(), None, false, connector.clone()).await { + Ok((socket, _)) => { + self.socket = Some(socket); + Ok(()) + } + Err(err) => { + log::error!("[{}] Connect failed: {err:?}", self.name); + Err(err.into()) } - sleep(Duration::from_millis(2000)).await; } } + + async fn run(&mut self) -> ApiResult<()> { + if let Some(socket) = self.socket.take() { + let z2m_socket = Z2mWebSocket::new(self.name.clone(), socket); + let mut chan = self.state.lock().await.backend_event_stream(); + let res = self.event_loop(&mut chan, z2m_socket).await; + if let Err(err) = res { + log::error!("[{}] Event loop broke: {err}", self.name); + } + } + Ok(()) + } + + async fn stop(&mut self) -> ApiResult<()> { + self.socket.take(); + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index c0f3c928..fd8adadf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,14 @@ use std::io::Write; -use svc::manager::ServiceManager; - -use bifrost::backend::Backend; -use bifrost::backend::z2m::Z2mBackend; +use bifrost::backend; use bifrost::config; use bifrost::error::ApiResult; use bifrost::server::appstate::AppState; use bifrost::server::http::HttpServer; use bifrost::server::mdns::MdnsService; use bifrost::server::{self, Protocol}; +use svc::manager::ServiceManager; +use svc::serviceid::ServiceId; /* * Formatter function to output in syslog format. This makes sense when running @@ -114,18 +113,12 @@ async fn build_tasks(appstate: &AppState) -> ApiResult<()> { mgr.register_service("entertainment", svc).await?; // register all z2m backends as services - for (name, server) in &appstate.config().z2m.servers { - let client = Z2mBackend::new( - name.clone(), - server.clone(), - appstate.config(), - appstate.res.clone(), - )?; - let stream = appstate.res.lock().await.backend_event_stream(); - let name = format!("z2m-{name}"); - let svc = client.run_forever(stream); - - mgr.register_function(name, svc).await?; + let template = backend::z2m::Z2mServiceTemplate::new(appstate.clone()); + mgr.register_template("z2m", template).await?; + + // start named z2m instances, since templated services appear when started + for name in appstate.config().z2m.servers.keys() { + mgr.start(ServiceId::instance("z2m", name)).await?; } // finally, iterate over all services and start them diff --git a/src/routes/bifrost/backend.rs b/src/routes/bifrost/backend.rs index 27831558..cb1e4169 100644 --- a/src/routes/bifrost/backend.rs +++ b/src/routes/bifrost/backend.rs @@ -4,7 +4,6 @@ use axum::routing::post; use bifrost_api::config::Z2mServer; -use crate::backend::Backend; use crate::backend::z2m::Z2mBackend; use crate::routes::bifrost::BifrostApiResult; use crate::routes::extractor::Json; @@ -20,12 +19,10 @@ async fn post_backend_z2m( let mut mgr = state.manager(); - let client = Z2mBackend::new(name.clone(), server, state.config(), state.res.clone())?; - let stream = state.res.lock().await.backend_event_stream(); + let svc = Z2mBackend::new(name.clone(), server, state.config(), state.res.clone())?; let name = format!("z2m-{name}"); - let svc = client.run_forever(stream); - mgr.register_function(&name, svc).await?; + mgr.register_service(&name, svc).await?; mgr.start(&name).await?; Ok(Json(())) diff --git a/src/routes/bifrost/service.rs b/src/routes/bifrost/service.rs index 20dc3547..f8242731 100644 --- a/src/routes/bifrost/service.rs +++ b/src/routes/bifrost/service.rs @@ -34,20 +34,20 @@ async fn put_service( State(state): State, Path(id): Path, Json(service_state): Json, -) -> BifrostApiResult> { +) -> BifrostApiResult> { let mut mgr = state.manager(); - match service_state { + let uuid = match service_state { ServiceState::Registered | ServiceState::Configured | ServiceState::Starting | ServiceState::Stopping - | ServiceState::Failed => {} + | ServiceState::Failed => mgr.resolve(id).await?, ServiceState::Running => mgr.start(id).await?, ServiceState::Stopped => mgr.stop(id).await?, - } + }; - Ok(Json(())) + Ok(Json(uuid)) } pub fn router() -> Router { diff --git a/src/server/entertainment.rs b/src/server/entertainment.rs index faeb6232..520fcde9 100644 --- a/src/server/entertainment.rs +++ b/src/server/entertainment.rs @@ -261,7 +261,6 @@ impl EntertainmentService { #[async_trait] impl Service for EntertainmentService { type Error = ApiError; - const SIGNAL_STOP: bool = false; async fn configure(&mut self) -> Result<(), Self::Error> { let mut bldr = SslContext::builder(SslMethod::dtls_server())?; @@ -321,8 +320,4 @@ impl Service for EntertainmentService { self.udp.take(); Ok(()) } - - async fn signal_stop(&mut self) -> Result<(), Self::Error> { - Ok(()) - } } diff --git a/src/server/http.rs b/src/server/http.rs index b847c292..d39bc5b8 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -14,7 +14,7 @@ use hyper::body::Incoming; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; -use svc::traits::Service; +use svc::traits::{Service, StopResult}; use crate::error::{ApiError, ApiResult}; @@ -39,7 +39,6 @@ where A::Future: Send, { type Error = ApiError; - const SIGNAL_STOP: bool = true; async fn start(&mut self) -> Result<(), ApiError> { log::info!("Opening listen port on {}", self.addr); @@ -66,9 +65,9 @@ where Ok(()) } - async fn signal_stop(&mut self) -> Result<(), ApiError> { + async fn signal_stop(&mut self) -> Result { self.handle.graceful_shutdown(Some(Duration::from_secs(1))); - Ok(()) + Ok(StopResult::Delivered) } } diff --git a/src/server/mdns.rs b/src/server/mdns.rs index 98b2330e..5e3d1dce 100644 --- a/src/server/mdns.rs +++ b/src/server/mdns.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use mac_address::MacAddress; use mdns_sd::{ServiceDaemon, ServiceInfo}; -use svc::traits::Service; +use svc::traits::{Service, StopResult}; use tokio::sync::watch::{self, Receiver, Sender}; use crate::error::ApiError; @@ -33,7 +33,6 @@ impl MdnsService { #[async_trait] impl Service for MdnsService { type Error = ApiError; - const SIGNAL_STOP: bool = true; async fn configure(&mut self) -> Result<(), Self::Error> { Ok(()) @@ -105,7 +104,7 @@ impl Service for MdnsService { Ok(()) } - async fn signal_stop(&mut self) -> Result<(), Self::Error> { + async fn signal_stop(&mut self) -> Result { if let Some(signal) = self.signal.take() { log::debug!("Shutting down mdns.."); signal @@ -113,6 +112,6 @@ impl Service for MdnsService { .map_err(|_| ApiError::service_error("Failed to send stop signal"))?; } - Ok(()) + Ok(StopResult::Delivered) } } diff --git a/src/server/ssdp.rs b/src/server/ssdp.rs index d50417db..dc4add53 100644 --- a/src/server/ssdp.rs +++ b/src/server/ssdp.rs @@ -7,7 +7,7 @@ use tokio::sync::Mutex; use tokio::sync::watch::{self, Receiver, Sender}; use tokio_ssdp::{Device, Server}; -use svc::traits::Service; +use svc::traits::{Service, StopResult}; use uuid::Uuid; use crate::error::ApiError; @@ -57,7 +57,6 @@ impl SsdpService { #[async_trait] impl Service for SsdpService { type Error = ApiError; - const SIGNAL_STOP: bool = true; async fn configure(&mut self) -> Result<(), Self::Error> { Ok(()) @@ -121,14 +120,14 @@ impl Service for SsdpService { Ok(()) } - async fn signal_stop(&mut self) -> Result<(), Self::Error> { + async fn signal_stop(&mut self) -> Result { if let Some(signal) = self.signal.take() { log::debug!("Shutting down ssdp.."); signal .send(true) .map_err(|_| ApiError::service_error("Failed to send stop signal"))?; } - Ok(()) + Ok(StopResult::Delivered) } }