diff --git a/Cargo.lock b/Cargo.lock index b1da201..5c7f6a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1259,7 +1259,7 @@ dependencies = [ [[package]] name = "kubelet" version = "0.5.0" -source = "git+https://github.com/deislabs/krustlet.git?rev=ac218b38ba564de806568e49d9e38aaef9f41537#ac218b38ba564de806568e49d9e38aaef9f41537" +source = "git+https://github.com/stackabletech/krustlet.git?rev=bb8bb42c9400a565df4be04f357e61934fb277c6#bb8bb42c9400a565df4be04f357e61934fb277c6" dependencies = [ "anyhow", "async-stream 0.3.0", @@ -1308,7 +1308,7 @@ dependencies = [ [[package]] name = "kubelet-derive" version = "0.1.0" -source = "git+https://github.com/deislabs/krustlet.git?rev=ac218b38ba564de806568e49d9e38aaef9f41537#ac218b38ba564de806568e49d9e38aaef9f41537" +source = "git+https://github.com/stackabletech/krustlet.git?rev=bb8bb42c9400a565df4be04f357e61934fb277c6#bb8bb42c9400a565df4be04f357e61934fb277c6" dependencies = [ "quote 1.0.7", "syn 1.0.50", @@ -1586,7 +1586,7 @@ dependencies = [ [[package]] name = "oci-distribution" version = "0.4.0" -source = "git+https://github.com/deislabs/krustlet.git?rev=ac218b38ba564de806568e49d9e38aaef9f41537#ac218b38ba564de806568e49d9e38aaef9f41537" +source = "git+https://github.com/stackabletech/krustlet.git?rev=bb8bb42c9400a565df4be04f357e61934fb277c6#bb8bb42c9400a565df4be04f357e61934fb277c6" dependencies = [ "anyhow", "futures-util", diff --git a/src/provider/states/running.rs b/src/provider/states/running.rs index 693bae3..49f9331 100644 --- a/src/provider/states/running.rs +++ b/src/provider/states/running.rs @@ -1,10 +1,11 @@ +use anyhow::anyhow; use k8s_openapi::api::core::v1::{ ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus, PodCondition, }; use kubelet::pod::Pod; use kubelet::state::prelude::*; use kubelet::state::{State, Transition}; -use log::{debug, trace}; +use log::{debug, info, trace}; use crate::provider::states::failed::Failed; use crate::provider::states::installing::Installing; @@ -12,6 +13,7 @@ use crate::provider::states::make_status_with_containers_and_condition; use crate::provider::PodState; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; use k8s_openapi::chrono; +use tokio::time::Duration; #[derive(Debug, TransitionTo)] #[transition_to(Failed, Running, Installing)] @@ -34,17 +36,57 @@ impl State for Running { pod_state: &mut PodState, _pod: &Pod, ) -> Transition { + // We loop here indefinitely and "wake up" periodically to check if the service is still + // up and running + // Interruption of this loop is triggered externally by the Krustlet code when + // - the pod which this state machine refers to gets deleted + // - Krustlet shuts down loop { - tokio::select! { - _ = tokio::time::delay_for(std::time::Duration::from_secs(10)) => { - trace!("Checking if service {} is still running.", &pod_state.service_name); + tokio::time::delay_for(Duration::from_secs(10)).await; + trace!( + "Checking if service {} is still running.", + &pod_state.service_name + ); + + // Iterate over all units and check their state + // if the [`service_units`] Option is a None variant, return a failed state + // as we need to run something otherwise we are not doing anything + let systemd_units = match &pod_state.service_units { + Some(units) => units, + None => return Transition::Complete(Err(anyhow!(format!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name)))), + }; + + for unit in systemd_units { + match pod_state.systemd_manager.is_running(&unit.get_name()) { + Ok(true) => trace!( + "Unit [{}] of service [{}] still running ...", + &unit.get_name(), + pod_state.service_name + ), + Ok(false) => { + info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, unit.get_name()); + return Transition::next( + self, + Failed { + message: "".to_string(), + }, + ); + } + Err(dbus_error) => { + info!( + "Error querying ActiveState for Unit [{}] of service [{}]: [{}].", + pod_state.service_name, + unit.get_name(), + dbus_error + ); + return Transition::Complete(Err(dbus_error)); + } } } - // TODO: We are not watching the service yet, need to subscribe to events and - // react to those } } + // test async fn json_status( &self, pod_state: &mut PodState, diff --git a/src/provider/states/starting.rs b/src/provider/states/starting.rs index 75cdada..5f32803 100644 --- a/src/provider/states/starting.rs +++ b/src/provider/states/starting.rs @@ -6,7 +6,10 @@ use crate::provider::states::failed::Failed; use crate::provider::states::running::Running; use crate::provider::states::setup_failed::SetupFailed; use crate::provider::PodState; -use log::{error, info, warn}; +use anyhow::anyhow; +use log::{debug, error, info, warn}; +use std::time::Instant; +use tokio::time::Duration; #[derive(Default, Debug, TransitionTo)] #[transition_to(Running, Failed, SetupFailed)] @@ -36,6 +39,37 @@ impl State for Starting { ); return Transition::Complete(Err(enable_error)); } + + let start_time = Instant::now(); + // TODO: does this need to be configurable, or ar we happy with a hard coded value + // for now. I've briefly looked at the podspec and couldn't identify a good field + // to use for this - also, currently this starts containers (= systemd units) in + // order and waits 10 seconds for every unit, so a service with five containers + // would take 50 seconds until it reported running - which is totally fine in case + // the units actually depend on each other, but a case could be made for waiting + // once at the end + while start_time.elapsed().as_secs() < 10 { + tokio::time::delay_for(Duration::from_secs(1)).await; + debug!( + "Checking if unit [{}] is still up and running.", + &unit.get_name() + ); + match pod_state.systemd_manager.is_running(&unit.get_name()) { + Ok(true) => debug!( + "Service [{}] still running after [{}] seconds", + &unit.get_name(), + start_time.elapsed().as_secs() + ), + Ok(false) => { + return Transition::Complete(Err(anyhow!(format!( + "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", + &unit.get_name(), + start_time.elapsed().as_secs() + )))) + } + Err(dbus_error) => return Transition::Complete(Err(dbus_error)), + } + } } } else { warn!( diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 5928b96..5e0ca4a 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -5,7 +5,7 @@ //! use crate::provider::systemdmanager::systemdunit::SystemDUnit; use anyhow::anyhow; -use dbus::arg::{AppendAll, ReadAll}; +use dbus::arg::{AppendAll, ReadAll, Variant}; use dbus::blocking::SyncConnection; use dbus::strings::Member; use dbus::Path; @@ -25,6 +25,7 @@ pub enum UnitTypes { const SYSTEMD_DESTINATION: &str = "org.freedesktop.systemd1"; const SYSTEMD_NODE: &str = "/org/freedesktop/systemd1"; const SYSTEMD_MANAGER_INTERFACE: &str = "org.freedesktop.systemd1.Manager"; +const DBUS_PROPERTIES_INTERFACE: &str = "org.freedesktop.DBus.Properties"; /// The main way of interacting with this module, this struct offers /// the public methods for managing service units. @@ -171,10 +172,7 @@ impl SystemdManager { } let unit_file = self.units_directory.join(&unit_name); - if linked_unit_file - && unit_file.exists() - && unit_file.symlink_metadata()?.file_type().is_file() - { + if unit_file.exists() && unit_file.symlink_metadata()?.file_type().is_file() { // Handle the special case where we need to replace an actual file with a symlink // This only occurs when switching from writing the file // directly into the units folder to using a linked file - should not happen in practice @@ -357,6 +355,28 @@ impl SystemdManager { } } + pub fn is_running(&self, unit: &str) -> Result { + let unit_node = self + .method_call("GetUnit", (&unit,)) + .map(|r: (Path,)| r.0)?; + + let proxy = self + .connection + .with_proxy(SYSTEMD_DESTINATION, &unit_node, self.timeout); + + let active_state = proxy + .method_call( + DBUS_PROPERTIES_INTERFACE, + "Get", + ("org.freedesktop.systemd1.Unit", "ActiveState"), + ) + .map(|r: (Variant,)| r.0)?; + + // TODO: I think this can panic, there should be a get() method on Variant that returns + // an option, but I've not yet been able to get that to work + Ok(active_state.0 == "active") + } + // Symlink a unit file into the systemd unit folder // This is not public on purpose, as [create] should be the normal way to link unit files // when using this crate