diff --git a/CHANGELOG.md b/CHANGELOG.md index 3095261..e66f712 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## [Unreleased] +### Added +- Cleanup stage added where systemd units without corresponding pods are + removed on startup ([#312]). + +[#312]: https://github.com/stackabletech/agent/pull/312 + ## [0.6.1] - 2021-09-14 ### Changed diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 80a8c28..f237078 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -5,5 +5,8 @@ * xref:limitations.adoc[] * xref:services.adoc[] * xref:jobs.adoc[] +* Stages +** xref:stages/overview.adoc[] +** xref:stages/cleanup.adoc[] * Monitoring ** xref:monitoring/logs.adoc[] diff --git a/docs/modules/ROOT/pages/stages/cleanup.adoc b/docs/modules/ROOT/pages/stages/cleanup.adoc new file mode 100644 index 0000000..8a989a3 --- /dev/null +++ b/docs/modules/ROOT/pages/stages/cleanup.adoc @@ -0,0 +1,8 @@ += Cleanup stage + +On startup the systemd units in the `system-stackable` slice are +compared to the pods assigned to this node. If a systemd unit is as +expected then it is kept and the Stackable agent will take ownership +again in a later stage. If there is no corresponding pod or the systemd +unit differs from the pod specification then it is removed and the +Stackable agent will create a new systemd unit afterwards. diff --git a/docs/modules/ROOT/pages/stages/overview.adoc b/docs/modules/ROOT/pages/stages/overview.adoc new file mode 100644 index 0000000..326c356 --- /dev/null +++ b/docs/modules/ROOT/pages/stages/overview.adoc @@ -0,0 +1,26 @@ += Overview + +When the Stackable Agent starts, it runs through the following stages: + +* Check configured directories and files. +** Check if the optional files can be opened if they exist. +** Create the directories where write access is required and which do + not exist yet. +** Check the configured directories if they are writable by the current + process. +* Bootstrap the cluster with TLS certificates but only if no existing + kubeconfig can be found. +* Remove all systemd units from a previous run without a corresponding + pod (see xref:stages/cleanup.adoc[]). +* Start the kubelet. + +After the kubelet was started, assigned pods run through the following +stages: + +* Download the package from a registered Stackable repository. +* Unpack the package and install it. +* Create the configuration files according to the config maps. +* Create, start, and enable the systemd units. +* Monitor the systemd units and patch the pod status accordingly. +* Stop, disable, and remove the systemd units on termination or when the + pod is deleted. diff --git a/src/bin/stackable-agent.rs b/src/bin/stackable-agent.rs index b6272f8..c4b57e8 100644 --- a/src/bin/stackable-agent.rs +++ b/src/bin/stackable-agent.rs @@ -135,6 +135,8 @@ async fn main() -> anyhow::Result<()> { .await .expect("Error initializing provider."); + provider.cleanup(&krustlet_config.node_name).await; + let kubelet = Kubelet::new(provider, kubeconfig, krustlet_config).await?; kubelet.start().await } diff --git a/src/provider/cleanup.rs b/src/provider/cleanup.rs new file mode 100644 index 0000000..215d4ee --- /dev/null +++ b/src/provider/cleanup.rs @@ -0,0 +1,245 @@ +//! Initial cleanup +//! +//! On startup the systemd units in the `system-stackable` slice are compared to the pods assigned +//! to this node. If a systemd unit is as expected then it is kept and the Stackable Agent will +//! take ownership again in the `Starting` stage. If there is no corresponding pod or the systemd +//! unit differs from the pod specification then it is removed and the Stackable Agent will create +//! a new systemd unit in the `CreatingService` stage. +//! +//! The cleanup stage is implemented as part of the [`StackableProvider`] because the expected +//! content of a systemd unit file can only be determined with the directories configured in the +//! provider. +//! +//! The cleanup code resides in a separate module because the amount of code justifies it and the +//! log output is more meaningful. It makes it clearer whether a systemd unit is removed in the +//! cleanup stage or in the normal process. +use std::collections::HashMap; + +use anyhow::Context; +use k8s_openapi::api::core::v1::Pod as KubePod; +use kube::api::{ListParams, Meta, ObjectList}; +use kube::Api; +use kubelet::pod::Pod; +use kubelet::provider::Provider; +use log::{debug, error, info, warn}; +use tokio::fs::{read_to_string, remove_file}; + +use super::systemdmanager::systemdunit::SystemDUnit; +use super::systemdmanager::systemdunit::STACKABLE_SLICE; +use super::StackableProvider; + +impl StackableProvider { + /// Removes systemd units without corresponding pods. + /// + /// The systemd units in the `system-stackable` slice are compared with the pods assigned to + /// this node and all units without corresponding pods or which differ from the pod + /// specifications are removed. + pub async fn cleanup(&self, node_name: &str) { + let systemd_manager = &self.shared.systemd_manager; + + if let Err(error) = systemd_manager.reload().await { + error!( + "Skipping the cleanup stage because the systemd daemon reload failed. {}", + error + ); + return; + } + + let units_in_slice = match systemd_manager.slice_content(STACKABLE_SLICE).await { + Ok(units_in_slice) => units_in_slice, + Err(error) => { + debug!( + "Skipping the cleanup stage because no systemd units were found in the slice \ + [{}]. {}", + STACKABLE_SLICE, error + ); + return; + } + }; + + let pods = match self.assigned_pods(node_name).await { + Ok(pods) => pods.items, + Err(error) => { + error!( + "The assigned pods could not be retrieved. All systemd units in the slice [{}] \ + will be removed. {}", + STACKABLE_SLICE, error + ); + Vec::new() + } + }; + + let mut units_from_pods = HashMap::new(); + for pod in pods { + let pod_terminating = pod.metadata.deletion_timestamp.is_some(); + + match self.units_from_pod(&pod).await { + Ok(units) => { + for (unit_name, content) in units { + units_from_pods.insert(unit_name, (content, pod_terminating)); + } + } + Err(error) => warn!( + "Systemd units could not be generated for pod [{}/{}]. {}", + pod.namespace().unwrap_or_else(|| String::from("default")), + pod.name(), + error + ), + } + } + + let mut unit_removed = false; + + for unit_name in &units_in_slice { + let remove_unit = match units_from_pods.get(unit_name) { + Some((expected_content, pod_terminating)) => { + match self.unit_file_content(unit_name).await { + Ok(Some(content)) if &content == expected_content && !pod_terminating => { + info!( + "The systemd unit [{}] will be kept because a corresponding pod \ + exists.", + unit_name + ); + false + } + Ok(Some(_)) if *pod_terminating => { + info!( + "The systemd unit [{}] will be removed because the corresponding \ + pod is terminating.", + unit_name + ); + true + } + Ok(Some(content)) => { + info!( + "The systemd unit [{}] will be removed because it differs from the \ + corresponding pod specification.\n\ + expected content:\n\ + {}\n\n\ + actual content:\n\ + {}", + unit_name, expected_content, content + ); + true + } + Ok(None) => { + info!( + "The systemd unit [{}] will be removed because its file path could \ + not be determined.", + unit_name + ); + true + } + Err(error) => { + warn!( + "The systemd unit [{}] will be removed because the file content \ + could not be retrieved. {}", + unit_name, error + ); + true + } + } + } + None => { + info!( + "The systemd unit [{}] will be removed because no corresponding pod \ + exists.", + unit_name + ); + true + } + }; + + if remove_unit { + self.remove_unit(unit_name).await; + unit_removed = true; + } + } + + if unit_removed { + let _ = systemd_manager.reload().await; + } + } + + /// Returns a list of all pods assigned to the given node. + async fn assigned_pods(&self, node_name: &str) -> anyhow::Result> { + let client = &self.shared.client; + + let api: Api = Api::all(client.to_owned()); + let lp = ListParams::default().fields(&format!("spec.nodeName={}", node_name)); + api.list(&lp).await.with_context(|| { + format!( + "The pods assigned to this node (nodeName = [{}]) could not be retrieved.", + node_name + ) + }) + } + + /// Creates the systemd unit files for the given pod in memory. + /// + /// A mapping from systemd unit file names to the file content is returned. + async fn units_from_pod(&self, kubepod: &KubePod) -> anyhow::Result> { + let systemd_manager = &self.shared.systemd_manager; + + let mut units = HashMap::new(); + let pod = Pod::from(kubepod.to_owned()); + let pod_state = self.initialize_pod_state(&pod).await?; + + for container in pod.containers() { + let unit = SystemDUnit::new( + systemd_manager.is_user_mode(), + &pod_state, + &self.shared.kubeconfig_path, + &pod, + &container, + )?; + units.insert(unit.get_name(), unit.get_unit_file_content()); + } + + Ok(units) + } + + /// Returns the content of the given systemd unit file. + async fn unit_file_content(&self, unit_name: &str) -> anyhow::Result> { + let systemd_manager = &self.shared.systemd_manager; + + let file_path_result = systemd_manager + .fragment_path(unit_name) + .await + .with_context(|| { + format!( + "The file path of the unit [{}] could not be determined.", + unit_name + ) + }); + + match file_path_result { + Ok(Some(file_path)) => { + let file_content = read_to_string(&file_path) + .await + .with_context(|| format!("The file [{}] could not be read.", file_path))?; + Ok(Some(file_content)) + } + Ok(None) => Ok(None), + Err(error) => Err(error), + } + } + + /// Stops, disables and removes the given systemd unit. + async fn remove_unit(&self, unit_name: &str) { + let systemd_manager = &self.shared.systemd_manager; + + if let Err(error) = systemd_manager.stop(unit_name).await { + warn!("{}", error); + } + if let Err(error) = systemd_manager.disable(unit_name).await { + warn!("{}", error); + } + if let Ok(Some(file_path)) = systemd_manager.fragment_path(unit_name).await { + debug!("Removing file [{}].", file_path); + if let Err(error) = remove_file(file_path).await { + warn!("{}", error); + } + } + } +} diff --git a/src/provider/mod.rs b/src/provider/mod.rs index b6ca1d5..e760773 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -1,21 +1,22 @@ +use std::collections::HashMap; use std::convert::TryFrom; -use std::fs; +use std::env; use std::net::IpAddr; use std::path::PathBuf; use std::sync::Arc; use anyhow::anyhow; +use dirs::home_dir; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; +use kube::error::ErrorResponse; use kube::{Api, Client}; use kubelet::backoff::ExponentialBackoffStrategy; +use kubelet::container::{ContainerKey, ContainerMap}; use kubelet::log::{SendError, Sender}; use kubelet::node::Builder; use kubelet::pod::state::prelude::*; use kubelet::pod::{Pod, PodKey}; -use kubelet::{ - container::{ContainerKey, ContainerMap}, - provider::Provider, -}; +use kubelet::provider::Provider; use log::{debug, error}; use tokio::{runtime::Runtime, sync::RwLock, task}; @@ -27,12 +28,10 @@ use crate::provider::error::StackableError::{ use crate::provider::repository::package::Package; use crate::provider::states::pod::PodState; use crate::provider::systemdmanager::manager::SystemdManager; -use kube::error::ErrorResponse; -use std::collections::HashMap; -use systemdmanager::journal_reader; -use self::states::pod::{initializing::Initializing, terminated::Terminated}; -use self::systemdmanager::service::SystemdService; +use states::pod::{initializing::Initializing, terminated::Terminated}; +use systemdmanager::journal_reader; +use systemdmanager::service::SystemdService; pub struct StackableProvider { shared: ProviderState, @@ -44,6 +43,7 @@ pub struct StackableProvider { pub const CRDS: &[&str] = &["repositories.stable.stackable.de"]; +pub mod cleanup; mod error; pub mod kubernetes; mod repository; @@ -57,6 +57,7 @@ pub struct ProviderState { client: Client, systemd_manager: Arc, server_ip_address: IpAddr, + kubeconfig_path: PathBuf, } /// Contains handles for running pods. @@ -134,11 +135,19 @@ impl StackableProvider { ) -> Result { let systemd_manager = Arc::new(SystemdManager::new(agent_config.session, max_pods).await?); + let kubeconfig_path = find_kubeconfig().ok_or_else(|| StackableError::RuntimeError { + msg: String::from( + "Kubeconfig file not found. If no kubeconfig is present then the Stackable Agent \ + should have generated one.", + ), + })?; + let provider_state = ProviderState { handles: Default::default(), client, systemd_manager, server_ip_address: agent_config.server_ip_address, + kubeconfig_path, }; let provider = StackableProvider { @@ -206,6 +215,15 @@ impl StackableProvider { } } +/// Tries to find the kubeconfig file in the environment variable `KUBECONFIG` and on the path +/// `$HOME/.kube/config` +fn find_kubeconfig() -> Option { + let env_var = env::var_os("KUBECONFIG").map(PathBuf::from); + let default_path = || home_dir().map(|home| home.join(".kube").join("config")); + + env_var.or_else(default_path).filter(|path| path.exists()) +} + #[async_trait::async_trait] impl Provider for StackableProvider { type ProviderState = ProviderState; @@ -243,16 +261,9 @@ impl Provider for StackableProvider { let parcel_directory = self.parcel_directory.clone(); // TODO: make this configurable let download_directory = parcel_directory.join("_download"); - let config_directory = self.config_directory.clone(); let log_directory = self.log_directory.clone(); let package = Self::get_package(pod)?; - if !(&download_directory.is_dir()) { - fs::create_dir_all(&download_directory)?; - } - if !(&config_directory.is_dir()) { - fs::create_dir_all(&config_directory)?; - } Ok(PodState { parcel_directory, diff --git a/src/provider/states/pod/creating_service.rs b/src/provider/states/pod/creating_service.rs index 0cc7a3f..96e408c 100644 --- a/src/provider/states/pod/creating_service.rs +++ b/src/provider/states/pod/creating_service.rs @@ -1,19 +1,15 @@ +use std::fs::create_dir_all; + +use anyhow::{Context, Error}; +use kubelet::container::ContainerKey; use kubelet::pod::state::prelude::*; -use kubelet::{ - container::ContainerKey, - pod::{Pod, PodKey}, -}; -use log::{debug, error, info, warn}; +use kubelet::pod::{Pod, PodKey}; +use log::{debug, error, info}; use super::setup_failed::SetupFailed; use super::starting::Starting; use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::{ContainerHandle, PodState, ProviderState}; -use anyhow::{Context, Error}; -use dirs::home_dir; -use std::env; -use std::fs::create_dir_all; -use std::path::PathBuf; #[derive(Default, Debug, TransitionTo)] #[transition_to(Starting, SetupFailed)] @@ -29,9 +25,12 @@ impl State for CreatingService { ) -> Transition { let pod = pod.latest(); - let systemd_manager = { + let (systemd_manager, kubeconfig_path) = { let provider_state = shared.read().await; - provider_state.systemd_manager.clone() + ( + provider_state.systemd_manager.clone(), + provider_state.kubeconfig_path.clone(), + ) }; let service_name: &str = pod_state.service_name.as_ref(); @@ -50,66 +49,21 @@ impl State for CreatingService { } } - let user_mode = systemd_manager.is_user_mode(); - - // Naming schema - // Service name: `namespace-podname` - // SystemdUnit: `namespace-podname-containername` - // TODO: add this to the docs in more detail - let service_prefix = format!("{}-{}-", pod.namespace(), pod.name()); - - // Create a template from those settings that are derived directly from the pod, not - // from container objects - let unit_template = match SystemDUnit::new_from_pod(&pod, user_mode) { - Ok(unit) => unit, - Err(pod_error) => { - error!( - "Unable to create systemd unit template from pod [{}]: [{}]", - service_name, pod_error - ); - return Transition::Complete(Err(Error::from(pod_error))); - } - }; - // Each pod can map to multiple systemd units/services as each container will get its own // systemd unit file/service. // Map every container from the pod object to a systemdunit for container in &pod.containers() { - let mut unit = match SystemDUnit::new( - &unit_template, - &service_prefix, - container, - user_mode, + let unit = match SystemDUnit::new( + systemd_manager.is_user_mode(), pod_state, + &kubeconfig_path, + &pod, + container, ) { Ok(unit) => unit, Err(err) => return Transition::Complete(Err(Error::from(err))), }; - if let Some(kubeconfig_path) = find_kubeconfig() { - const UNIT_ENV_KEY: &str = "KUBECONFIG"; - if let Some(kubeconfig_path) = kubeconfig_path.to_str() { - unit.add_env_var(UNIT_ENV_KEY, kubeconfig_path); - } else { - warn!( - "Environment variable {} cannot be added to \ - the systemd service [{}] because the path [{}] \ - is not valid unicode.", - UNIT_ENV_KEY, - service_name, - kubeconfig_path.to_string_lossy() - ); - } - } else { - warn!( - "Kubeconfig file not found. It will not be added \ - to the environment variables of the systemd \ - service [{}]. If no kubeconfig is present then the \ - Stackable agent should have generated one.", - service_name - ); - } - // Create the service // As per ADR005 we currently write the unit files directly in the systemd // unit directory (by passing None as [unit_file_path]). @@ -162,12 +116,3 @@ impl State for CreatingService { Ok(make_status(Phase::Pending, "CreatingService")) } } - -/// Tries to find the kubeconfig file in the environment variable -/// `KUBECONFIG` and on the path `$HOME/.kube/config` -fn find_kubeconfig() -> Option { - let env_var = env::var_os("KUBECONFIG").map(PathBuf::from); - let default_path = || home_dir().map(|home| home.join(".kube").join("config")); - - env_var.or_else(default_path).filter(|path| path.exists()) -} diff --git a/src/provider/states/pod/downloading.rs b/src/provider/states/pod/downloading.rs index fec257c..1906a25 100644 --- a/src/provider/states/pod/downloading.rs +++ b/src/provider/states/pod/downloading.rs @@ -1,8 +1,10 @@ use std::path::Path; +use anyhow::Context; use kubelet::pod::state::prelude::*; use kubelet::pod::Pod; use log::{debug, error, info, warn}; +use tokio::fs::create_dir_all; use super::downloading_backoff::DownloadingBackoff; use super::installing::Installing; @@ -71,6 +73,13 @@ impl State for Downloading { &package, &repo ); let download_directory = pod_state.download_directory.clone(); + + if !(download_directory.is_dir()) { + if let Err(error) = create_download_directory(&download_directory).await { + return Transition::Complete(Err(error)); + } + }; + let download_result = repo .download_package(&package, download_directory.clone()) .await; @@ -135,3 +144,13 @@ impl State for Downloading { Ok(make_status(Phase::Pending, "Downloading")) } } + +async fn create_download_directory(download_directory: &Path) -> anyhow::Result<()> { + info!("Creating download directory [{:?}].", download_directory); + create_dir_all(&download_directory).await.with_context(|| { + format!( + "Download directory [{}] could not be created.", + download_directory.to_string_lossy() + ) + }) +} diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 2217899..d0bcf5c 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -5,8 +5,8 @@ //! use super::service::SystemdService; use super::systemd1_api::{ - AsyncJobProxy, AsyncManagerProxy, JobRemovedResult, JobRemovedSignal, ManagerSignals, - StartMode, StopMode, + AsyncJobProxy, AsyncManagerProxy, AsyncUnitProxy, JobRemovedResult, JobRemovedSignal, + ManagerSignals, StartMode, StopMode, }; use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::StackableError; @@ -388,6 +388,41 @@ impl SystemdManager { SystemdService::new(unit, &self.proxy).await } + /// Returns the file path of the given unit if there is one. + pub async fn fragment_path(&self, unit: &str) -> anyhow::Result> { + let unit_proxy = self.create_unit_proxy(unit).await?; + let fragment_path = unit_proxy.fragment_path().await?; + + let file_path = if fragment_path.is_empty() { + None + } else { + Some(fragment_path) + }; + + Ok(file_path) + } + + /// Returns the names of the units assigned to the given slice. + pub async fn slice_content(&self, slice: &str) -> anyhow::Result> { + let unit_proxy = self.create_unit_proxy(slice).await?; + let content = unit_proxy.required_by().await?; + Ok(content) + } + + async fn create_unit_proxy(&self, unit: &str) -> anyhow::Result> { + let unit_object_path = self.proxy.load_unit(unit).await?; + + let unit_proxy = AsyncUnitProxy::builder(self.proxy.connection()) + .cache_properties(false) + .path(unit_object_path) + .unwrap() // safe because load_unit always returns a valid path + .build() + .await + .unwrap(); // safe because destination, path, and interface are set + + Ok(unit_proxy) + } + // Check if the unit name is valid and append .service if needed // Cannot currently fail, I'll need to dig into what is a valid unit // name before adding checks diff --git a/src/provider/systemdmanager/systemd1_api.rs b/src/provider/systemdmanager/systemd1_api.rs index c085fdb..3712315 100644 --- a/src/provider/systemdmanager/systemd1_api.rs +++ b/src/provider/systemdmanager/systemd1_api.rs @@ -407,6 +407,12 @@ impl Display for InvocationId { interface = "org.freedesktop.systemd1.Unit" )] trait Unit { + /// `RequiredBy` contains an array which encodes the inverse + /// dependencies (where this applies) as configured in the unit file + /// or determined automatically. + #[dbus_proxy(property)] + fn required_by(&self) -> zbus::Result>; + /// The active state (i.e. whether the unit is currently started or /// not) #[dbus_proxy(property)] @@ -425,10 +431,15 @@ trait Unit { /// states. /// /// Possible sub states can be found in the source code of systemd: - /// https://github.com/systemd/systemd/blob/v249/src/basic/unit-def.h + /// #[dbus_proxy(property)] fn sub_state(&self) -> zbus::Result; + /// `FragmentPath` contains the unit file path this unit was read + /// from, if there is one (if not, it contains the empty string). + #[dbus_proxy(property)] + fn fragment_path(&self) -> zbus::Result; + /// Unique ID for a runtime cycle of a unit #[dbus_proxy(property, name = "InvocationID")] fn invocation_id(&self) -> zbus::Result; diff --git a/src/provider/systemdmanager/systemdunit.rs b/src/provider/systemdmanager/systemdunit.rs index d3d1d95..ee2d201 100644 --- a/src/provider/systemdmanager/systemdunit.rs +++ b/src/provider/systemdmanager/systemdunit.rs @@ -1,8 +1,16 @@ use std::collections::{BTreeMap, HashMap}; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::iter::{self, repeat}; use std::path::Path; use kubelet::container::Container; use kubelet::pod::Pod; +use lazy_static::lazy_static; +use log::{debug, error, info, trace, warn}; +use multimap::MultiMap; +use regex::Regex; +use strum::{Display, EnumIter, IntoEnumIterator}; use crate::provider::error::StackableError; use crate::provider::error::StackableError::PodValidationError; @@ -10,19 +18,14 @@ use crate::provider::kubernetes::accessor::{restart_policy, RestartPolicy}; use crate::provider::states::pod::creating_config::CreatingConfig; use crate::provider::states::pod::PodState; use crate::provider::systemdmanager::manager::UnitTypes; -use lazy_static::lazy_static; -use log::{debug, error, info, trace, warn}; -use multimap::MultiMap; -use regex::Regex; -use std::fmt; -use std::fmt::{Display, Formatter}; -use std::iter::{self, repeat}; -use strum::{Display, EnumIter, IntoEnumIterator}; /// The default timeout for stopping a service, after this has passed systemd will terminate /// the process const DEFAULT_TERMINATION_TIMEOUT_SECS: i64 = 30; +/// The slice all service units will be placed in. +pub const STACKABLE_SLICE: &str = "system-stackable.slice"; + /// List of sections in the systemd unit /// /// The sections are written in the same order as listed here into the unit file. @@ -143,15 +146,21 @@ pub struct SystemDUnit { // at some point consider splitting this out and have systemdunit live // inside the systemd crate and the parsing in the agent impl SystemDUnit { - /// Create a new unit which inherits all common elements from ['common_properties'] and parses - /// everything else from the ['container'] pub fn new( - common_properties: &SystemDUnit, - name_prefix: &str, - container: &Container, user_mode: bool, pod_state: &PodState, - ) -> Result { + kubeconfig_path: &Path, + pod: &Pod, + container: &Container, + ) -> Result { + let common_properties = SystemDUnit::new_from_pod(pod, user_mode)?; + + // Naming schema + // Service name: `namespace-podname` + // SystemdUnit: `namespace-podname-containername` + // TODO: add this to the docs in more detail + let name_prefix = format!("{}-{}-", pod.namespace(), pod.name()); + // Create template data to be used when rendering template strings let template_data = if let Ok(data) = CreatingConfig::create_render_data(pod_state) { data @@ -167,15 +176,32 @@ impl SystemDUnit { let package_root = pod_state.get_service_package_directory(); - SystemDUnit::new_from_container( - common_properties, - name_prefix, + let mut unit = SystemDUnit::new_from_container( + &common_properties, + &name_prefix, container, &pod_state.service_name, &template_data, &package_root, user_mode, - ) + )?; + + unit.set_property(Section::Service, "Slice", STACKABLE_SLICE); + + const UNIT_ENV_KEY: &str = "KUBECONFIG"; + if let Some(kubeconfig_path) = kubeconfig_path.to_str() { + unit.add_env_var(UNIT_ENV_KEY, kubeconfig_path); + } else { + warn!( + "The environment variable {} cannot be added to the systemd service [{}] because \ + the path [{}] is not valid unicode.", + UNIT_ENV_KEY, + unit.get_name(), + kubeconfig_path.to_string_lossy() + ); + }; + + Ok(unit) } fn new_from_container( @@ -346,8 +372,8 @@ impl SystemDUnit { } /// Configures the time to sleep in seconds before restarting a - /// service (as configured with [set_restart_option]). Defaults to - /// 100ms. + /// service (as configured with [`Self::set_restart_option`]). + /// Defaults to 100ms. fn set_restart_sec_option(&mut self, seconds: u32) { self.set_property(Section::Service, "RestartSec", &seconds.to_string()); } @@ -618,8 +644,9 @@ impl Display for SystemDUnit { #[cfg(test)] mod test { use super::*; - use crate::provider::test::TestPod; + use crate::provider::{repository::package::Package, test::TestPod}; use indoc::indoc; + use kubelet::backoff::ExponentialBackoffStrategy; use rstest::rstest; use std::path::PathBuf; @@ -638,21 +665,33 @@ mod test { metadata: name: stackable spec: - containers: [] + containers: + - name: test-container + command: + - start.sh securityContext: windowsOptions: runAsUserName: pod-user", - "stackable.service", - indoc! {" + "default-stackable-test-container.service", + indoc! {r#" [Unit] + Description=default-stackable-test-container StartLimitIntervalSec=0 [Service] + Environment="KUBECONFIG=~/.kube/config" + ExecStart=/run/test-1.0.0/start.sh RemainAfterExit=no Restart=always RestartSec=2 + Slice=system-stackable.slice + StandardError=journal + StandardOutput=journal TimeoutStopSec=30 - User=pod-user"} + User=pod-user + + [Install] + WantedBy=multi-user.target"#} )] #[case::with_container_on_system_bus( BusType::System, @@ -663,7 +702,7 @@ mod test { name: stackable spec: containers: - - name: test-container.service + - name: test-container command: - start.sh args: @@ -687,12 +726,14 @@ mod test { StartLimitIntervalSec=0 [Service] + Environment="KUBECONFIG=~/.kube/config" Environment="LOG_DIR=/var/log/default-stackable" Environment="LOG_LEVEL=INFO" - ExecStart=start.sh arg /etc/default-stackable + ExecStart=/run/test-1.0.0/start.sh arg /etc/default-stackable-b3ca9d08-b97d-45bc-9da1-7b0156712ef1 RemainAfterExit=no Restart=always RestartSec=2 + Slice=system-stackable.slice StandardError=journal StandardOutput=journal TimeoutStopSec=30 @@ -710,7 +751,7 @@ mod test { name: stackable spec: containers: - - name: test-container.service + - name: test-container command: - start.sh securityContext: @@ -726,10 +767,12 @@ mod test { StartLimitIntervalSec=0 [Service] - ExecStart=start.sh + Environment="KUBECONFIG=~/.kube/config" + ExecStart=/run/test-1.0.0/start.sh RemainAfterExit=no Restart=always RestartSec=2 + Slice=system-stackable.slice StandardError=journal StandardOutput=journal TimeoutStopSec=30 @@ -745,18 +788,30 @@ mod test { metadata: name: stackable spec: - terminationGracePeriodSeconds: 10 - containers: []", - "stackable.service", - indoc! {" + containers: + - name: test-container + command: + - start.sh + terminationGracePeriodSeconds: 10", + "default-stackable-test-container.service", + indoc! {r#" [Unit] + Description=default-stackable-test-container StartLimitIntervalSec=0 [Service] + Environment="KUBECONFIG=~/.kube/config" + ExecStart=/run/test-1.0.0/start.sh RemainAfterExit=no Restart=always RestartSec=2 - TimeoutStopSec=10"} + Slice=system-stackable.slice + StandardError=journal + StandardOutput=journal + TimeoutStopSec=10 + + [Install] + WantedBy=multi-user.target"#} )] #[case::set_restart_policy( BusType::System, @@ -766,55 +821,60 @@ mod test { metadata: name: stackable spec: - containers: [] + containers: + - name: test-container + command: + - start.sh restartPolicy: OnFailure", - "stackable.service", - indoc! {" + "default-stackable-test-container.service", + indoc! {r#" [Unit] + Description=default-stackable-test-container StartLimitIntervalSec=0 [Service] + Environment="KUBECONFIG=~/.kube/config" + ExecStart=/run/test-1.0.0/start.sh RemainAfterExit=yes Restart=on-failure RestartSec=2 - TimeoutStopSec=30" - } - )] + Slice=system-stackable.slice + StandardError=journal + StandardOutput=journal + TimeoutStopSec=30 + [Install] + WantedBy=multi-user.target"#} + )] fn create_unit_from_pod( #[case] bus_type: BusType, #[case] pod: TestPod, #[case] expected_unit_file_name: &str, #[case] expected_unit_file_content: &str, ) { - let mut result = SystemDUnit::new_from_pod(&pod, bus_type == BusType::Session); - - if let Ok(common_properties) = &result { - if let Some(container) = pod.containers().first() { - let service_name = format!("{}-{}", pod.namespace(), pod.name()); - let name_prefix = format!("{}-", service_name); - let mut template_data = BTreeMap::new(); - template_data.insert( - String::from("logroot"), - format!("/var/log/{}", &service_name), - ); - template_data.insert( - String::from("configroot"), - format!("/etc/{}", &service_name), - ); - let package_root = PathBuf::new(); - - result = SystemDUnit::new_from_container( - common_properties, - &name_prefix, - container, - &service_name, - &template_data, - &package_root, - bus_type == BusType::Session, - ); - } - } + let kubeconfig_path = PathBuf::from("~/.kube/config"); + + let pod_state = PodState { + parcel_directory: PathBuf::from("/run"), + download_directory: PathBuf::new(), + config_directory: PathBuf::from("/etc"), + log_directory: PathBuf::from("/var/log"), + package_download_backoff_strategy: ExponentialBackoffStrategy::default(), + service_name: format!("{}-{}", pod.namespace(), pod.name()), + service_uid: String::from("b3ca9d08-b97d-45bc-9da1-7b0156712ef1"), + package: Package { + product: String::from("test"), + version: String::from("1.0.0"), + }, + }; + + let result = SystemDUnit::new( + bus_type == BusType::Session, + &pod_state, + &kubeconfig_path, + &pod, + pod.containers().first().expect("A container is required."), + ); if let Ok(unit) = result { assert_eq!(expected_unit_file_name, unit.get_name());