diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 40fc125..26bfa68 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2 - run: rustup update stable && rustup default stable - - run: sudo apt-get install libdbus-1-dev pkg-config libdbus-1-3 + - run: sudo apt-get install libdbus-1-dev libsystemd-dev pkg-config libdbus-1-3 - run: rustup component add rustfmt - run: cargo fmt --all -- --check @@ -23,7 +23,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Install dbus dependencies - run: sudo apt-get install libdbus-1-dev pkg-config libdbus-1-3 + run: sudo apt-get install libdbus-1-dev libsystemd-dev pkg-config libdbus-1-3 - uses: actions/checkout@v1 - uses: actions-rs/toolchain@v1 with: @@ -40,7 +40,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Install dbus dependencies - run: sudo apt-get install libdbus-1-dev pkg-config libdbus-1-3 + run: sudo apt-get install libdbus-1-dev libsystemd-dev pkg-config libdbus-1-3 - name: Build run: cargo build --verbose - name: Run tests @@ -53,4 +53,4 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/audit-check@v1 with: - token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/Cargo.lock b/Cargo.lock index bdd62f7..9f7e7ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,6 +221,12 @@ dependencies = [ "safemem", ] +[[package]] +name = "build-env" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cf89846ef2b2674ef1c153256cec98fba587c72bf4ea2c4b2f6d91a19f55926" + [[package]] name = "bumpalo" version = "3.6.1" @@ -373,6 +379,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "cstr-argument" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20bd4e8067c20c7c3a4dea759ef91d4b18418ddb5bd8837ef6e2f2f93ca7ccbb" +dependencies = [ + "cfg-if 0.1.10", + "memchr", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -576,7 +592,28 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" dependencies = [ - "foreign-types-shared", + "foreign-types-shared 0.1.1", +] + +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared 0.3.0", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63f713f8b2aa9e24fec85b0e290c56caee12e3b6ae0aeeda238a75b28251afd6" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -585,6 +622,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "foreign-types-shared" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7684cf33bb7f28497939e8c7cf17e3e4e3b8d9a0080ffa4f8ae2f515442ee855" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1331,6 +1374,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libsystemd-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e03fd580bcecda68dcdcd5297085ade6a3dc552cd8b030d2b94a9b089ef7ab8" +dependencies = [ + "build-env", + "libc", + "pkg-config", +] + [[package]] name = "linked-hash-map" version = "0.5.4" @@ -1627,7 +1681,7 @@ checksum = "a61075b62a23fef5a29815de7536d940aa35ce96d18ce0cc5076272db678a577" dependencies = [ "bitflags", "cfg-if 1.0.0", - "foreign-types", + "foreign-types 0.3.2", "libc", "once_cell", "openssl-sys", @@ -2582,6 +2636,7 @@ dependencies = [ "stackable_config", "strum", "strum_macros", + "systemd", "tar", "thiserror", "tokio 1.5.0", @@ -2724,6 +2779,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "systemd" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f722cabda922e471742300045f56dbaa53fafbb4520fca304e51258019bfe91d" +dependencies = [ + "cstr-argument", + "foreign-types 0.5.0", + "libc", + "libsystemd-sys", + "log", + "memchr", + "utf8-cstr", +] + [[package]] name = "tar" version = "0.4.33" @@ -3276,6 +3346,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" +[[package]] +name = "utf8-cstr" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55bcbb425141152b10d5693095950b51c3745d019363fc2929ffd8f61449b628" + [[package]] name = "uuid" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 08dad1e..e51cbe5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ shellexpand = "2.1" stackable_config = { git = "https://github.com/stackabletech/common.git", branch = "main" } strum = { version = "0.20", features = ["derive"] } strum_macros = "0.20" +systemd = "0.8" tar = "0.4" thiserror = "1.0" tokio = { version = "1.5", features = ["macros", "rt-multi-thread", "time"] } @@ -56,4 +57,4 @@ systemd-units = { enable = false } assets = [ ["packaging/config/agent.conf", "etc/stackable/stackable-agent/", "644"], ["target/release/stackable-agent", "opt/stackable-agent/stackable-agent", "755"], -] \ No newline at end of file +] diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 4f6589e..a5c2520 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -7,13 +7,16 @@ use anyhow::anyhow; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; use kube::{Api, Client}; use kubelet::backoff::ExponentialBackoffStrategy; -use kubelet::log::Sender; +use kubelet::log::{SendError, Sender}; use kubelet::node::Builder; use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; -use kubelet::provider::Provider; +use kubelet::pod::{Pod, PodKey}; +use kubelet::{ + container::{ContainerKey, ContainerMap}, + provider::Provider, +}; use log::{debug, error}; -use tokio::sync::RwLock; +use tokio::{runtime::Runtime, sync::RwLock, task}; use crate::provider::error::StackableError; use crate::provider::error::StackableError::{ @@ -25,7 +28,8 @@ use crate::provider::states::pod::terminated::Terminated; use crate::provider::states::pod::PodState; use crate::provider::systemdmanager::manager::SystemdManager; use kube::error::ErrorResponse; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; +use systemdmanager::journal_reader; pub struct StackableProvider { shared: ProviderState, @@ -45,10 +49,124 @@ mod systemdmanager; /// Provider-level state shared between all pods #[derive(Clone)] pub struct ProviderState { + handles: Arc>, client: Client, systemd_manager: Arc, } +/// Contains handles for running pods. +/// +/// A `PodHandleMap` maps a pod key to a pod handle which in turn +/// contains/is a map from a container key to a container handle. +/// A container handle contains all necessary runtime information like the +/// name of the service unit. +/// +/// The implementation of `PodHandleMap` contains functions to access the +/// parts of this structure while preserving the invariants. +#[derive(Debug, Default)] +struct PodHandleMap { + handles: HashMap, +} + +impl PodHandleMap { + /// Returns the pod handle for the given key or [`None`] if not found. + pub fn get(&self, pod_key: &PodKey) -> Option<&PodHandle> { + self.handles.get(pod_key) + } + + /// Removes the pod handle with the given key and returns it. + pub fn remove(&mut self, pod_key: &PodKey) -> Option { + self.handles.remove(pod_key) + } + + /// Inserts a new [`ContainerHandle`] for the given pod and container key. + /// + /// A pod handle is created if not already existent. + pub fn insert_container_handle( + &mut self, + pod_key: &PodKey, + container_key: &ContainerKey, + container_handle: &ContainerHandle, + ) { + self.handles + .entry(pod_key.to_owned()) + .or_insert_with(ContainerMap::new) + .insert(container_key.to_owned(), container_handle.to_owned()); + } + + /// Sets the invocation ID for the given pod and container key. + /// + /// If there is no corresponding container handle then an error is + /// returned. + pub fn set_invocation_id( + &mut self, + pod_key: &PodKey, + container_key: &ContainerKey, + invocation_id: &str, + ) -> anyhow::Result<()> { + if let Some(mut container_handle) = self.container_handle_mut(pod_key, container_key) { + container_handle.invocation_id = Some(String::from(invocation_id)); + Ok(()) + } else { + Err(anyhow!( + "Invocation ID could not be stored. Container handle for + pod [{:?}] and container [{}] not found", + pod_key, + container_key + )) + } + } + + /// Returns a reference to the container handle with the given pod and + /// container key or [`None`] if not found. + pub fn container_handle( + &self, + pod_key: &PodKey, + container_key: &ContainerKey, + ) -> Option<&ContainerHandle> { + self.handles + .get(pod_key) + .and_then(|pod_handle| pod_handle.get(container_key)) + } + + /// Returns a mutable reference to the container handle with the given + /// pod and container key or [`None`] if not found. + fn container_handle_mut( + &mut self, + pod_key: &PodKey, + container_key: &ContainerKey, + ) -> Option<&mut ContainerHandle> { + self.handles + .get_mut(pod_key) + .and_then(|pod_handle| pod_handle.get_mut(container_key)) + } +} + +/// Represents a handle to a running pod. +type PodHandle = ContainerMap; + +/// Represents a handle to a running container. +#[derive(Clone, Debug)] +pub struct ContainerHandle { + /// Contains the name of the corresponding service unit. + /// Can be used as reference in [`crate::provider::systemdmanager::manager`]. + pub service_unit: String, + + /// Contains the systemd invocation ID which identifies the + /// corresponding entries in the journal. + pub invocation_id: Option, +} + +impl ContainerHandle { + /// Creates an instance with the given service unit name. + pub fn new(service_unit: &str) -> Self { + ContainerHandle { + service_unit: String::from(service_unit), + invocation_id: None, + } + } +} + impl StackableProvider { pub async fn new( client: Client, @@ -61,6 +179,7 @@ impl StackableProvider { let systemd_manager = Arc::new(SystemdManager::new(session, Duration::from_secs(5))?); let provider_state = ProviderState { + handles: Default::default(), client, systemd_manager, }; @@ -187,17 +306,60 @@ impl Provider for StackableProvider { service_name, service_uid, package, - service_units: None, }) } async fn logs( &self, - _namespace: String, - _pod: String, - _container: String, - _sender: Sender, + namespace: String, + pod: String, + container: String, + mut sender: Sender, ) -> anyhow::Result<()> { + let pod_key = PodKey::new(&namespace, &pod); + let container_key = ContainerKey::App(container); + + debug!( + "Logs for pod [{:?}] and container [{:?}] requested", + pod_key, container_key + ); + + let maybe_container_handle = { + let handles = self.shared.handles.read().await; + handles + .container_handle(&pod_key, &container_key) + .map(ContainerHandle::to_owned) + }; + + let container_handle = maybe_container_handle.ok_or_else(|| { + anyhow!( + "Container handle for pod [{:?}] and container [{:?}] not found", + pod_key, + container_key + ) + })?; + let invocation_id = container_handle.invocation_id.ok_or_else(|| { + anyhow!( + "Invocation ID for container [{}] in pod [{:?}] is unknown. \ + The service is probably not started yet.", + container_key, + pod_key + ) + })?; + + task::spawn_blocking(move || { + let result = Runtime::new() + .unwrap() + .block_on(journal_reader::send_messages(&mut sender, &invocation_id)); + + if let Err(error) = result { + match error.downcast_ref::() { + Some(SendError::ChannelClosed) => (), + _ => error!("Log could not be sent. {}", error), + } + } + }); + Ok(()) } } diff --git a/src/provider/states/pod.rs b/src/provider/states/pod.rs index 68eeaab..d581447 100644 --- a/src/provider/states/pod.rs +++ b/src/provider/states/pod.rs @@ -5,7 +5,6 @@ use kubelet::pod::state::prelude::*; use kubelet::pod::{Pod, Status}; use crate::provider::repository::package::Package; -use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::ProviderState; pub(crate) mod creating_config; @@ -29,7 +28,6 @@ pub struct PodState { pub service_name: String, pub service_uid: String, pub package: Package, - pub service_units: Option>, } impl PodState { diff --git a/src/provider/states/pod/creating_service.rs b/src/provider/states/pod/creating_service.rs index cb43cf4..da887b0 100644 --- a/src/provider/states/pod/creating_service.rs +++ b/src/provider/states/pod/creating_service.rs @@ -1,11 +1,15 @@ use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; +use kubelet::{ + container::ContainerKey, + pod::{Pod, PodKey}, +}; use log::{debug, error, info}; use super::setup_failed::SetupFailed; use super::starting::Starting; use crate::provider::systemdmanager::systemdunit::SystemDUnit; -use crate::provider::{PodState, ProviderState}; +use crate::provider::{ContainerHandle, PodState, ProviderState}; +use anyhow::Error; use std::fs::create_dir_all; #[derive(Default, Debug, TransitionTo)] @@ -16,14 +20,14 @@ pub struct CreatingService; impl State for CreatingService { async fn next( self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, pod: Manifest, ) -> Transition { let pod = pod.latest(); let systemd_manager = { - let provider_state = provider_state.read().await; + let provider_state = shared.read().await; provider_state.systemd_manager.clone() }; @@ -39,7 +43,7 @@ impl State for CreatingService { pod_state.service_name, service_directory ); if let Err(error) = create_dir_all(service_directory) { - return Transition::Complete(Err(anyhow::Error::from(error))); + return Transition::Complete(Err(Error::from(error))); } } @@ -60,34 +64,25 @@ impl State for CreatingService { "Unable to create systemd unit template from pod [{}]: [{}]", service_name, pod_error ); - return Transition::Complete(Err(anyhow::Error::from(pod_error))); + return Transition::Complete(Err(Error::from(pod_error))); } }; // Each pod can map to multiple systemd units/services as each container will get its own // systemd unit file/service. // Map every container from the pod object to a systemdunit - let systemd_units: Vec = match pod - .containers() - .iter() - .map(|container| { - SystemDUnit::new( - &unit_template, - &service_prefix, - container, - user_mode, - pod_state, - ) - }) - .collect() - { - Ok(units) => units, - Err(err) => return Transition::Complete(Err(anyhow::Error::from(err))), - }; + for container in &pod.containers() { + let unit = match SystemDUnit::new( + &unit_template, + &service_prefix, + &container, + user_mode, + pod_state, + ) { + Ok(unit) => unit, + Err(err) => return Transition::Complete(Err(Error::from(err))), + }; - // This will iterate over all systemd units, write the service files to disk and link - // the service to systemd. - for unit in &systemd_units { // Create the service // As per ADR005 we currently write the unit files directly in the systemd // unit directory (by passing None as [unit_file_path]). @@ -103,10 +98,20 @@ impl State for CreatingService { return Transition::Complete(Err(e)); } } + + { + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; + handles.insert_container_handle( + &PodKey::from(&pod), + &ContainerKey::App(String::from(container.name())), + &ContainerHandle::new(&unit.get_name()), + ) + }; + // Done for now, if the service was created successfully we are happy // Starting and enabling comes in a later state after all service have been createddy } - pod_state.service_units = Some(systemd_units); // All services were loaded successfully, otherwise we'd have returned early above Transition::next(self, Starting) diff --git a/src/provider/states/pod/running.rs b/src/provider/states/pod/running.rs index 82e9b46..e32fe84 100644 --- a/src/provider/states/pod/running.rs +++ b/src/provider/states/pod/running.rs @@ -4,13 +4,13 @@ use k8s_openapi::api::core::v1::{ }; use krator::ObjectStatus; use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; +use kubelet::pod::{Pod, PodKey}; use log::{debug, info, trace}; use super::failed::Failed; use super::installing::Installing; use crate::provider::states::make_status_with_containers_and_condition; -use crate::provider::{PodState, ProviderState}; +use crate::provider::{PodHandle, PodState, ProviderState}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; use k8s_openapi::chrono; use tokio::time::Duration; @@ -33,13 +33,20 @@ impl Default for Running { impl State for Running { async fn next( mut self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, - _pod: Manifest, + pod: Manifest, ) -> Transition { - let systemd_manager = { - let provider_state = provider_state.read().await; - provider_state.systemd_manager.clone() + let pod = pod.latest(); + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { + let provider_state = shared.read().await; + let handles = provider_state.handles.read().await; + ( + provider_state.systemd_manager.clone(), + handles.get(&pod_key).map(PodHandle::to_owned), + ) }; // We loop here indefinitely and "wake up" periodically to check if the service is still @@ -57,20 +64,22 @@ impl State for Running { // Iterate over all units and check their state // if the [`service_units`] Option is a None variant, return a failed state // as we need to run something otherwise we are not doing anything - let systemd_units = match &pod_state.service_units { - Some(units) => units, - None => return Transition::Complete(Err(anyhow!(format!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name)))), + let containers = match &pod_handle { + Some(containers) => containers, + None => return Transition::Complete(Err(anyhow!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name))), }; - for unit in systemd_units { - match systemd_manager.is_running(&unit.get_name()) { + for container_handle in containers.values() { + let service_unit = &container_handle.service_unit; + + match systemd_manager.is_running(&service_unit) { Ok(true) => trace!( "Unit [{}] of service [{}] still running ...", - &unit.get_name(), + service_unit, pod_state.service_name ), Ok(false) => { - info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, unit.get_name()); + info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, service_unit); return Transition::next( self, Failed { @@ -81,9 +90,7 @@ impl State for Running { Err(dbus_error) => { info!( "Error querying ActiveState for Unit [{}] of service [{}]: [{}].", - pod_state.service_name, - unit.get_name(), - dbus_error + pod_state.service_name, service_unit, dbus_error ); return Transition::Complete(Err(dbus_error)); } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 5006528..8a6dad2 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -1,122 +1,145 @@ -use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; - -use super::failed::Failed; use super::running::Running; -use super::setup_failed::SetupFailed; -use crate::provider::{PodState, ProviderState}; -use anyhow::anyhow; -use log::{debug, error, info, warn}; +use crate::provider::{ + systemdmanager::manager::SystemdManager, PodHandle, PodState, ProviderState, +}; + +use anyhow::{anyhow, Result}; +use kubelet::pod::state::prelude::*; +use kubelet::{ + container::ContainerKey, + pod::{Pod, PodKey}, +}; +use log::{debug, error, info}; use std::time::Instant; -use tokio::time::Duration; +use tokio::time::{self, Duration}; #[derive(Default, Debug, TransitionTo)] -#[transition_to(Running, Failed, SetupFailed)] +#[transition_to(Running)] pub struct Starting; #[async_trait::async_trait] impl State for Starting { async fn next( self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, - _: Manifest, + pod: Manifest, ) -> Transition { - let systemd_manager = { - let provider_state = provider_state.read().await; - provider_state.systemd_manager.clone() - }; - - if let Some(systemd_units) = &pod_state.service_units { - for unit in systemd_units { - match systemd_manager.is_running(&unit.get_name()) { - Ok(true) => { - debug!( - "Unit [{}] for service [{}] already running, nothing to do..", - &unit.get_name(), - &pod_state.service_name - ); - // Skip rest of loop as the service is already running - continue; - } - Err(dbus_error) => { - debug!( - "Error retrieving activestate of unit [{}] for service [{}]: [{}]", - &unit.get_name(), - &pod_state.service_name, - dbus_error - ); - return Transition::Complete(Err(dbus_error)); - } - _ => { // nothing to do, just keep going - } - } - info!("Starting systemd unit [{}]", unit); - if let Err(start_error) = systemd_manager.start(&unit.get_name()) { - error!( - "Error occurred starting systemd unit [{}]: [{}]", - unit.get_name(), - start_error - ); - return Transition::Complete(Err(start_error)); - } - - info!("Enabling systemd unit [{}]", unit); - if let Err(enable_error) = systemd_manager.enable(&unit.get_name()) { - error!( - "Error occurred starting systemd unit [{}]: [{}]", - unit.get_name(), - enable_error - ); - return Transition::Complete(Err(enable_error)); - } - - let start_time = Instant::now(); - // TODO: does this need to be configurable, or ar we happy with a hard coded value - // for now. I've briefly looked at the podspec and couldn't identify a good field - // to use for this - also, currently this starts containers (= systemd units) in - // order and waits 10 seconds for every unit, so a service with five containers - // would take 50 seconds until it reported running - which is totally fine in case - // the units actually depend on each other, but a case could be made for waiting - // once at the end - while start_time.elapsed().as_secs() < 10 { - tokio::time::sleep(Duration::from_secs(1)).await; - debug!( - "Checking if unit [{}] is still up and running.", - &unit.get_name() - ); - match systemd_manager.is_running(&unit.get_name()) { - Ok(true) => debug!( - "Service [{}] still running after [{}] seconds", - &unit.get_name(), - start_time.elapsed().as_secs() - ), - Ok(false) => { - return Transition::Complete(Err(anyhow!(format!( - "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", - &unit.get_name(), - start_time.elapsed().as_secs() - )))) - } - Err(dbus_error) => return Transition::Complete(Err(dbus_error)), - } - } + let pod = pod.latest(); + + match start_service_units(shared, pod_state, &pod).await { + Ok(()) => Transition::next(self, Running::default()), + Err(error) => { + error!("{}", error); + Transition::Complete(Err(error)) } - } else { - warn!( - "No unit definitions found, not starting anything for pod [{}]!", - pod_state.service_name - ); } - Transition::next( - self, - Running { - ..Default::default() - }, + } + + async fn status(&self, _pod_state: &mut PodState, _pod: &Pod) -> Result { + Ok(make_status(Phase::Pending, "Starting")) + } +} + +/// Starts the service units for the containers of the given pod. +/// +/// The units are started and enabled if they are not already running. +/// The startup is considered successful if the unit is still running +/// after 10 seconds. +async fn start_service_units( + shared: SharedState, + pod_state: &PodState, + pod: &Pod, +) -> Result<()> { + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { + let provider_state = shared.read().await; + let handles = provider_state.handles.read().await; + ( + provider_state.systemd_manager.clone(), + handles.get(&pod_key).map(PodHandle::to_owned), ) + }; + + for (container_key, container_handle) in pod_handle.unwrap_or_default() { + let service_unit = &container_handle.service_unit; + + if systemd_manager.is_running(service_unit)? { + debug!( + "Unit [{}] for service [{}] is already running. Skip startup.", + service_unit, &pod_state.service_name + ); + } else { + info!("Starting systemd unit [{}]", service_unit); + systemd_manager.start(service_unit)?; + + info!("Enabling systemd unit [{}]", service_unit); + systemd_manager.enable(service_unit)?; + + // TODO: does this need to be configurable, or ar we happy with a hard coded value + // for now. I've briefly looked at the podspec and couldn't identify a good field + // to use for this - also, currently this starts containers (= systemd units) in + // order and waits 10 seconds for every unit, so a service with five containers + // would take 50 seconds until it reported running - which is totally fine in case + // the units actually depend on each other, but a case could be made for waiting + // once at the end + await_startup(&systemd_manager, service_unit, Duration::from_secs(10)).await?; + } + + let invocation_id = systemd_manager.get_invocation_id(service_unit)?; + store_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?; } - async fn status(&self, _pod_state: &mut PodState, _pod: &Pod) -> anyhow::Result { - Ok(make_status(Phase::Pending, &"Starting")) + Ok(()) +} + +/// Checks if the given service unit is still running after the given duration. +async fn await_startup( + systemd_manager: &SystemdManager, + service_unit: &str, + duration: Duration, +) -> Result<()> { + let start_time = Instant::now(); + while start_time.elapsed() < duration { + time::sleep(Duration::from_secs(1)).await; + + debug!( + "Checking if unit [{}] is still up and running.", + service_unit + ); + + if systemd_manager.is_running(service_unit)? { + debug!( + "Service [{}] still running after [{}] seconds", + service_unit, + start_time.elapsed().as_secs() + ); + } else { + return Err(anyhow!( + "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", + service_unit, + start_time.elapsed().as_secs() + )); + } } + + Ok(()) +} + +/// Stores the given invocation ID into the corresponding container handle. +async fn store_invocation_id( + shared: SharedState, + pod_key: &PodKey, + container_key: &ContainerKey, + invocation_id: &str, +) -> Result<()> { + debug!( + "Set invocation ID [{}] for pod [{:?}] and container [{}].", + invocation_id, pod_key, container_key + ); + + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; + handles.set_invocation_id(&pod_key, &container_key, invocation_id) } diff --git a/src/provider/states/pod/terminated.rs b/src/provider/states/pod/terminated.rs index 5742bef..1b243f1 100644 --- a/src/provider/states/pod/terminated.rs +++ b/src/provider/states/pod/terminated.rs @@ -1,4 +1,4 @@ -use kubelet::pod::state::prelude::*; +use kubelet::pod::{state::prelude::*, PodKey}; use log::{error, info, warn}; use crate::provider::{PodState, ProviderState}; @@ -13,40 +13,48 @@ pub struct Terminated { impl State for Terminated { async fn next( self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, - _pod: Manifest, + pod: Manifest, ) -> Transition { info!( "Pod {} was terminated, stopping service!", &pod_state.service_name ); - let systemd_manager = { - let provider_state = provider_state.read().await; - provider_state.systemd_manager.clone() + let pod = pod.latest(); + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; + ( + provider_state.systemd_manager.clone(), + handles.remove(&pod_key), + ) }; // TODO: We need some additional error handling here, wait for the services to actually // shut down and try to remove the rest of the services if one fails (tbd, do we want that?) - if let Some(systemd_units) = &pod_state.service_units { - for unit in systemd_units { - info!("Stopping systemd unit [{}]", unit); - if let Err(stop_error) = systemd_manager.stop(&unit.get_name()) { + if let Some(containers) = pod_handle { + for container_handle in containers.values() { + let service_unit = &container_handle.service_unit; + + info!("Stopping systemd unit [{}]", service_unit); + if let Err(stop_error) = systemd_manager.stop(service_unit) { error!( "Error occurred stopping systemd unit [{}]: [{}]", - unit.get_name(), - stop_error + service_unit, stop_error ); return Transition::Complete(Err(stop_error)); } // Daemon reload is false here, we'll do that once after all units have been removed - info!("Removing systemd unit [{}]", &unit); - if let Err(remove_error) = systemd_manager.remove_unit(&unit.get_name(), false) { + info!("Removing systemd unit [{}]", service_unit); + if let Err(remove_error) = systemd_manager.remove_unit(service_unit, false) { error!( "Error occurred removing systemd unit [{}]: [{}]", - unit, remove_error + service_unit, remove_error ); return Transition::Complete(Err(remove_error)); } diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs new file mode 100644 index 0000000..a3abf82 --- /dev/null +++ b/src/provider/systemdmanager/journal_reader.rs @@ -0,0 +1,120 @@ +//! This module provides functions for reading from the journal. + +use anyhow::{Error, Result}; +use kubelet::log::Sender; +use std::str; +use systemd::{journal, journal::JournalRef}; + +/// Reads journal entries with the given invocation ID and sends the +/// contained messages. +/// +/// The options `tail` and `follow` in [`sender`] are taken into account. +/// +/// If `tail` is set with `Some(line_count)` then only the last +/// `line_count` messages (or less if not enough available) are sent +/// otherwise all available messages are sent. +/// +/// If `follow` is `true` then additionally all new messages are sent +/// until the channel of [`sender`] is closed. In this case an +/// [`Err(kubelet::log::SendError::ChannelClosed)`] will be returned. +pub async fn send_messages(sender: &mut Sender, invocation_id: &str) -> Result<()> { + let mut journal = journal::OpenOptions::default().open()?; + let journal = journal.match_add("_SYSTEMD_INVOCATION_ID", invocation_id)?; + + if let Some(line_count) = sender.tail() { + seek_journal_backwards(journal, line_count)?; + + if sender.follow() { + send_remaining_messages(journal, sender).await?; + } else { + send_n_messages(journal, sender, line_count).await?; + } + } else { + send_remaining_messages(journal, sender).await?; + } + + while sender.follow() { + journal.wait(None)?; + send_remaining_messages(journal, sender).await?; + } + + Ok(()) +} + +/// Sets the cursor of the journal to the position before the last `count` +/// entries so that the next entry is the first of `count` remaining +/// entries. If the beginning of the journal is reached then the cursor is +/// set to the position before the first entry. +fn seek_journal_backwards(journal: &mut JournalRef, count: usize) -> Result<()> { + journal.seek_tail()?; + + let entries_to_skip = count + 1; + let skipped = journal.previous_skip(entries_to_skip as u64)?; + let beginning_reached = skipped < entries_to_skip; + if beginning_reached { + journal.seek_head()?; + } + + Ok(()) +} + +/// Sends the given number of messages from the journal. +async fn send_n_messages( + journal: &mut JournalRef, + sender: &mut Sender, + count: usize, +) -> Result<()> { + let mut sent = 0; + let mut message_available = true; + while sent != count && message_available { + if let Some(message) = next_message(journal)? { + send_message(sender, &message).await?; + sent += 1; + } else { + message_available = false; + } + } + Ok(()) +} + +/// Sends the remaining messages from the journal. +async fn send_remaining_messages(journal: &mut JournalRef, sender: &mut Sender) -> Result<()> { + while let Some(message) = next_message(journal)? { + send_message(sender, &message).await?; + } + Ok(()) +} + +/// Retrieves the message of the next entry from the journal. +/// +/// Returns [`Ok(Some(message))`] if a message could be successfully retrieved +/// and advances the position in the journal. If the journal entry has no +/// message assigned then `message` is an empty string. +/// Returns [`Ok(None)`] if there are no new entries. +/// Returns [`Err(error)`] if the journal could not be read. +fn next_message(journal: &mut JournalRef) -> Result> { + let maybe_message = if journal.next()? != 0 { + let message = if let Some(entry) = journal.get_data("MESSAGE")? { + if let Some(value) = entry.value() { + String::from_utf8_lossy(value).into() + } else { + // The MESSAGE field contains no text, i.e. `MESSAGE=`. + String::new() + } + } else { + // The journal entry contains no MESSAGE field. + String::new() + }; + Some(message) + } else { + None + }; + Ok(maybe_message) +} + +/// Sends the given message with a newline character. +async fn send_message(sender: &mut Sender, message: &str) -> Result<()> { + let mut line = message.to_owned(); + line.push('\n'); + sender.send(line).await.map_err(Error::new) +} diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 1c55a81..918a5ef 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -7,7 +7,7 @@ use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::StackableError; use crate::provider::StackableError::RuntimeError; use anyhow::anyhow; -use dbus::arg::{AppendAll, ReadAll, Variant}; +use dbus::arg::{AppendAll, Get, ReadAll, RefArg, Variant}; use dbus::blocking::SyncConnection; use dbus::strings::Member; use dbus::Path; @@ -373,7 +373,31 @@ impl SystemdManager { } } - pub fn is_running(&self, unit: &str) -> Result { + /// Checks if the ActiveState of the given unit is set to active. + pub fn is_running(&self, unit: &str) -> anyhow::Result { + self.get_value::(unit, "ActiveState") + .map(|v| v.as_str() == Some("active")) + .map_err(|dbus_error| { + anyhow!( + "Error receiving ActiveState of unit [{}]. {}", + unit, + dbus_error + ) + }) + } + + /// Retrieves the invocation ID for the given unit. + pub fn get_invocation_id(&self, unit: &str) -> anyhow::Result { + self.get_value::>(unit, "InvocationID") + .map(|Variant(vec)| vec.iter().map(|byte| format!("{:02x}", byte)).collect()) + } + + /// Retrieves the value for the given property of the given unit. + pub fn get_value Get<'a>>( + &self, + unit: &str, + property: &str, + ) -> anyhow::Result> { // We are using `LoadUnit` here, as GetUnit can fail seemingly at random, when the unit // is not loaded due to systemd garbage collection. // see https://github.com/systemd/systemd/issues/1929 for more information @@ -385,17 +409,15 @@ impl SystemdManager { .connection .with_proxy(SYSTEMD_DESTINATION, &unit_node, self.timeout); - let active_state = proxy + let value = proxy .method_call( DBUS_PROPERTIES_INTERFACE, "Get", - ("org.freedesktop.systemd1.Unit", "ActiveState"), + ("org.freedesktop.systemd1.Unit", property), ) - .map(|r: (Variant,)| r.0)?; + .map(|r: (Variant,)| r.0)?; - // TODO: I think this can panic, there should be a get() method on Variant that returns - // an option, but I've not yet been able to get that to work - Ok(active_state.0 == "active") + Ok(value) } // Symlink a unit file into the systemd unit folder diff --git a/src/provider/systemdmanager/mod.rs b/src/provider/systemdmanager/mod.rs index d87d4b5..8c2df7b 100644 --- a/src/provider/systemdmanager/mod.rs +++ b/src/provider/systemdmanager/mod.rs @@ -1,2 +1,3 @@ +pub mod journal_reader; pub mod manager; pub mod systemdunit;