From 9fd55602b2e0dbd3c293c7eae635299cb4d72466 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 19 May 2021 14:01:21 +0200 Subject: [PATCH 1/9] Functions to start and stop systemd units now await the JobRemoved signal --- Cargo.lock | 1 + Cargo.toml | 1 + src/provider/systemdmanager/manager.rs | 69 ++++++-- src/provider/systemdmanager/systemd1_api.rs | 181 ++++++++++++++++++-- 4 files changed, 226 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d61c32d..afb545b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2704,6 +2704,7 @@ dependencies = [ "byteorder", "env_logger 0.8.3", "flate2", + "futures-util", "handlebars", "hostname", "indoc", diff --git a/Cargo.toml b/Cargo.toml index 7bf10a3..784c990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ async-trait = "0.1" byteorder = "1.4" env_logger = "0.8" flate2 = "1.0" +futures-util = "0.3" handlebars = "3.5" hostname = "0.3" k8s-openapi = { version = "0.11", default-features = false, features = ["api", "v1_18"] } diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 84e4084..5333bf3 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -3,14 +3,19 @@ //! The module offers the ability to create, remove, start, stop, enable and //! disable systemd units. //! -use super::systemd1_api::{ActiveState, AsyncManagerProxy, StartMode, StopMode}; +use super::systemd1_api::{ + ActiveState, AsyncJobProxy, AsyncManagerProxy, JobRemovedResult, JobRemovedSignal, + ManagerSignals, StartMode, StopMode, +}; use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::StackableError; use crate::provider::StackableError::RuntimeError; use anyhow::anyhow; +use futures_util::{future, stream::StreamExt}; use log::debug; use std::fs; use std::fs::File; +use std::future::Future; use std::io::Write; use std::path::PathBuf; use zbus::azync::Connection; @@ -273,15 +278,17 @@ impl SystemdManager { /// systemd at the time this is called. /// To make a service known please take a look at the [`SystemdManager::enable`] function. pub async fn start(&self, unit: &str) -> anyhow::Result<()> { - debug!("Attempting to start unit {}", unit); + debug!("Trying to start unit [{}].", unit); - match self.proxy.start_unit(unit, StartMode::Fail).await { - Ok(result) => { - debug!("Successfully started service [{}]: [{:?}]", unit, result); - Ok(()) - } - Err(e) => Err(anyhow!("Error starting service [{}]: {}", unit, e)), + let result = self + .process_job(|proxy| proxy.start_unit(unit, StartMode::Fail)) + .await; + + if result.is_ok() { + debug!("Successfully started service [{}]", unit); } + + result.map_err(|e| anyhow!("Error starting service [{}]: {}", unit, e)) } /// Attempts to stop a systemd unit @@ -289,14 +296,46 @@ impl SystemdManager { /// systemd at the time this is called. /// To make a service known please take a look at the [`SystemdManager::enable`] function. pub async fn stop(&self, unit: &str) -> anyhow::Result<()> { - debug!("Trying to stop systemd unit [{}]", unit); + debug!("Trying to stop systemd unit [{}].", unit); - match self.proxy.stop_unit(unit, StopMode::Fail).await { - Ok(result) => { - debug!("Successfully stopped service [{}]: [{:?}]", unit, result); - Ok(()) - } - Err(e) => Err(anyhow!("Error stopping service [{}]: {}", unit, e)), + let result = self + .process_job(|proxy| proxy.stop_unit(unit, StopMode::Fail)) + .await; + + if result.is_ok() { + debug!("Successfully stopped service [{}]", unit); + } + + result.map_err(|e| anyhow!("Error stopping service [{}]: {}", unit, e)) + } + + /// Runs the given task and waits until the job returned by the task + /// is finished. + async fn process_job<'a, F, Fut>(&'a self, task: F) -> anyhow::Result<()> + where + F: Fn(&'a AsyncManagerProxy) -> Fut, + Fut: Future>>, + { + let signals = self + .proxy + .receive_signal(ManagerSignals::JobRemoved.into()) + .await? + .map(|message| message.body::().unwrap()); + + let job = task(&self.proxy).await?; + + let signal = signals + .filter(|signal| future::ready(&signal.job.to_owned().into_inner() == job.path())) + .next() + .await; + + match signal { + Some(message) if message.result == JobRemovedResult::Done => Ok(()), + Some(message) => Err(anyhow!("The systemd job failed: {:?}", message)), + None => Err(anyhow!( + "No signal was returned for the systemd job: {:?}", + job + )), } } diff --git a/src/provider/systemdmanager/systemd1_api.rs b/src/provider/systemdmanager/systemd1_api.rs index b57b9f2..401b3ce 100644 --- a/src/provider/systemdmanager/systemd1_api.rs +++ b/src/provider/systemdmanager/systemd1_api.rs @@ -7,12 +7,12 @@ use std::{ fmt::{self, Formatter}, str::FromStr, }; -use strum::{AsRefStr, EnumString, EnumVariantNames, VariantNames}; +use strum::{AsRefStr, Display, EnumString, EnumVariantNames, IntoStaticStr, VariantNames}; use zbus::dbus_proxy; -use zvariant::{derive::Type, OwnedValue, Signature, Type}; +use zvariant::{derive::Type, OwnedObjectPath, OwnedValue, Signature, Type}; /// Type of an entry in a changes list -#[derive(Debug, EnumString, EnumVariantNames, PartialEq)] +#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)] #[strum(serialize_all = "kebab-case")] pub enum ChangeType { Symlink, @@ -63,8 +63,8 @@ pub struct Change { type Changes = Vec; /// Mode in which a unit will be started -#[derive(Debug, AsRefStr)] -#[allow(dead_code)] +#[derive(Debug, Display, AsRefStr)] +#[strum(serialize_all = "kebab-case")] pub enum StartMode { /// The unit and its dependencies will be started, possibly /// replacing already queued jobs that conflict with it. @@ -107,8 +107,8 @@ impl Type for StartMode { } /// Mode in which a unit will be stopped -#[derive(Debug, AsRefStr)] -#[allow(dead_code)] +#[derive(Debug, Display, AsRefStr)] +#[strum(serialize_all = "kebab-case")] pub enum StopMode { /// The unit and its dependencies will be stopped, possibly /// replacing already queued jobs that conflict with it. @@ -155,7 +155,7 @@ impl Type for StopMode { /// Synchronous API: /// /// ``` -/// # use stackable_agent::provider::systemdmanager::systemd1_api::ManagerProxy; +/// # use stackable_agent::provider::systemdmanager::systemd1_api::*; /// let connection = zbus::Connection::new_system().unwrap(); /// let manager = ManagerProxy::new(&connection).unwrap(); /// let unit = manager.load_unit("dbus.service").unwrap(); @@ -164,7 +164,7 @@ impl Type for StopMode { /// Asynchronous API: /// /// ``` -/// # use stackable_agent::provider::systemdmanager::systemd1_api::AsyncManagerProxy; +/// # use stackable_agent::provider::systemdmanager::systemd1_api::*; /// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// let connection = zbus::azync::Connection::new_system().await.unwrap(); /// let manager = AsyncManagerProxy::new(&connection).unwrap(); @@ -234,9 +234,109 @@ trait Manager { fn link_unit_files(&self, files: &[&str], runtime: bool, force: bool) -> zbus::Result; } +/// Signals of the manager object. +/// +/// Currently not all signals are listed. +/// +/// # Example +/// +/// ``` +/// # use stackable_agent::provider::systemdmanager::systemd1_api::*; +/// // necessary when calling `map` on `zbus::azync::SignalStream` +/// use futures_util::stream::StreamExt; +/// +/// # tokio::runtime::Runtime::new().unwrap().block_on(async { +/// let connection = zbus::azync::Connection::new_system().await.unwrap(); +/// let manager = AsyncManagerProxy::new(&connection).unwrap(); +/// let signals = manager +/// .receive_signal(ManagerSignals::JobRemoved.into()).await.unwrap() +/// .map(|message| message.body::().unwrap()); +/// # }); +/// ``` +#[derive(Debug, Display, Eq, PartialEq, IntoStaticStr)] +pub enum ManagerSignals { + /// Sent out each time a job is dequeued + JobRemoved, +} + +/// Result in the `JobRemoved` signal. +#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)] +#[strum(serialize_all = "kebab-case")] +pub enum JobRemovedResult { + /// Indicates successful execution of a job + Done, + + /// Indicates that a job has been canceled before it finished + /// execution; This doesn't necessarily mean though that the job + /// operation is actually cancelled too. + Canceled, + + /// Indicates that the job timeout was reached + Timeout, + + /// Indicates that the job failed + Failed, + + /// Indicates that a job this job depended on failed and the job + /// hence was removed as well + Dependency, + + /// Indicates that a job was skipped because it didn't apply to the + /// unit's current state + Skipped, +} + +impl<'de> Deserialize<'de> for JobRemovedResult { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct VariantVisitor; + + impl<'de> Visitor<'de> for VariantVisitor { + type Value = JobRemovedResult; + + fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "Expecting one of {:?}", Self::Value::VARIANTS) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + FromStr::from_str(v).map_err(|_| E::unknown_variant(v, Self::Value::VARIANTS)) + } + } + + deserializer.deserialize_str(VariantVisitor) + } +} + +impl Type for JobRemovedResult { + fn signature() -> Signature<'static> { + String::signature() + } +} + +/// Message body of [`ManagerSignals::JobRemoved`] +#[derive(Debug, Deserialize, Type)] +pub struct JobRemovedSignal { + /// Numeric job ID + pub id: u32, + + /// Bus path + pub job: OwnedObjectPath, + + /// Primary unit name for this job + pub unit: String, + + /// Result + pub result: JobRemovedResult, +} + /// ActiveState contains a state value that reflects whether the unit is /// currently active or not. -#[derive(Debug, EnumString, PartialEq)] +#[derive(Debug, Display, EnumString, Eq, PartialEq)] #[strum(serialize_all = "kebab-case")] pub enum ActiveState { /// The unit is active. @@ -272,6 +372,7 @@ impl TryFrom for ActiveState { } /// Unique ID for a runtime cycle of a unit +#[derive(Debug, Eq, PartialEq)] pub struct InvocationId(Vec); impl TryFrom for InvocationId { @@ -333,6 +434,11 @@ mod test { assert_eq!(ChangeType::Symlink, deserialize(&serialize("symlink"))); } + #[test] + fn display_change_type() { + assert_eq!("symlink", ChangeType::Symlink.to_string()); + } + #[test] fn serialize_start_mode() { assert_eq!( @@ -341,6 +447,14 @@ mod test { ); } + #[test] + fn display_start_mode() { + assert_eq!( + "ignore-dependencies", + StartMode::IgnoreDependencies.to_string() + ); + } + #[test] fn serialize_stop_mode() { assert_eq!( @@ -349,6 +463,35 @@ mod test { ); } + #[test] + fn display_stop_mode() { + assert_eq!( + "ignore-dependencies", + StopMode::IgnoreDependencies.to_string() + ); + } + + #[test] + fn display_manager_signals() { + assert_eq!("JobRemoved", ManagerSignals::JobRemoved.to_string()); + } + + #[test] + fn convert_manager_signals_into_static_str() { + let static_str: &'static str = ManagerSignals::JobRemoved.into(); + assert_eq!("JobRemoved", static_str); + } + + #[test] + fn deserialize_job_removed_result() { + assert_eq!(JobRemovedResult::Done, deserialize(&serialize("done"))); + } + + #[test] + fn display_job_removed_result() { + assert_eq!("done", JobRemovedResult::Done.to_string()); + } + #[test] fn try_active_state_from_owned_value() { assert_eq!( @@ -358,7 +501,23 @@ mod test { } #[test] - fn invocation_id_to_string() { + fn display_active_state() { + assert_eq!("active", ActiveState::Active.to_string()); + } + + #[test] + fn try_invocation_id_from_owned_value() { + let bytes = vec![ + 0xbe, 0x44, 0xae, 0xfc, 0xa3, 0xbf, 0x46, 0xba, 0xb0, 0x4b, 0x37, 0x52, 0x09, 0x5d, + 0xd9, 0x97, + ]; + let invocation_id = InvocationId(bytes.clone()); + let owned_value = OwnedValue::from(Value::from(bytes)); + assert_eq!(invocation_id, InvocationId::try_from(owned_value).unwrap()); + } + + #[test] + fn display_invocation_id() { let invocation_id = InvocationId(vec![ 0xbe, 0x44, 0xae, 0xfc, 0xa3, 0xbf, 0x46, 0xba, 0xb0, 0x4b, 0x37, 0x52, 0x09, 0x5d, 0xd9, 0x97, From 1811b4b67e760208abacedf408c15c7952c197b6 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 19 May 2021 14:03:51 +0200 Subject: [PATCH 2/9] All dependencies upgraded --- Cargo.lock | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index afb545b..d70cd27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,9 +332,9 @@ dependencies = [ [[package]] name = "const_fn" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402da840495de3f976eaefc3485b7f5eb5b0bf9761f9a47be27fe975b3b8c2ec" +checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7" [[package]] name = "core-foundation" @@ -354,9 +354,9 @@ checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" [[package]] name = "cpufeatures" -version = "0.1.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec1028182c380cc45a2e2c5ec841134f2dfd0f8f5f0a5bcd68004f81b5efdf4" +checksum = "ed00c67cb5d0a7d64a44f6ad2668db7e7530311dd53ea79bcd4fb022c64911c8" dependencies = [ "libc", ] @@ -918,7 +918,7 @@ dependencies = [ "headers-core", "http 0.2.4", "mime", - "sha-1 0.9.5", + "sha-1 0.9.6", "time 0.1.44", ] @@ -1124,9 +1124,9 @@ dependencies = [ [[package]] name = "inotify" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19f57db1baad9d09e43a3cd76dcf82ebdafd37d75c9498b87762dba77c93f15" +checksum = "b031475cb1b103ee221afb806a23d35e0570bf7271d7588762ceba8127ed43b3" dependencies = [ "bitflags", "inotify-sys", @@ -1760,9 +1760,9 @@ dependencies = [ [[package]] name = "openssl-probe" -version = "0.1.2" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" @@ -2539,9 +2539,9 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dc6b7951b17b051f3210b063f12cc17320e2fe30ae05b0fe2a3abb068551c76" +checksum = "98d0516900518c29efa217c298fa1f4e6c6ffc85ae29fd7f4ee48f176e1a9ed5" dependencies = [ "proc-macro2", "quote", @@ -2586,9 +2586,9 @@ dependencies = [ [[package]] name = "sha-1" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b659df5fc3ce22274daac600ffb845300bd2125bcfaec047823075afdab81c00" +checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16" dependencies = [ "block-buffer 0.9.0", "cfg-if 1.0.0", @@ -2605,9 +2605,9 @@ checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" [[package]] name = "sha2" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8f6b75b17576b792bef0db1bcc4b8b8bcdf9506744cf34b974195487af6cff2" +checksum = "b362ae5752fd2137731f9fa25fd4d9058af34666ca1966fb969119cc35719f12" dependencies = [ "block-buffer 0.9.0", "cfg-if 1.0.0", @@ -3071,7 +3071,7 @@ dependencies = [ "once_cell", "pin-project-lite 0.2.6", "signal-hook-registry", - "tokio-macros 1.1.0", + "tokio-macros 1.2.0", "winapi 0.3.9", ] @@ -3102,9 +3102,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" dependencies = [ "proc-macro2", "quote", @@ -3134,9 +3134,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0" +checksum = "f8864d706fdb3cc0843a49647ac892720dac98a6eeb818b77190592cf4994066" dependencies = [ "futures-core", "pin-project-lite 0.2.6", @@ -3159,9 +3159,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" dependencies = [ "bytes 1.0.1", "futures-core", @@ -3327,7 +3327,7 @@ dependencies = [ "input_buffer", "log", "rand 0.8.3", - "sha-1 0.9.5", + "sha-1 0.9.6", "url 2.2.2", "utf-8", ] From 8a2316921e2a48db31e0da3668c3c5d8610ff7f0 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 19 May 2021 14:16:50 +0200 Subject: [PATCH 3/9] Version set to 0.1.1 and changelog updated --- CHANGELOG.md | 38 ++++++++++++++++++++++++++++++++++++++ Cargo.lock | 2 +- Cargo.toml | 2 +- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b8e5f2..87c06c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.1.1 - 2021-05-20 + +### Fixed +- Pod state synchronized with systemd service state ([#164]). + +[#164]: https://github.com/stackabletech/agent/pull/164 + ## 0.1.0 - 2021-05-17 ### Added @@ -18,3 +25,34 @@ - Retrieval of container logs with kubectl logs implemented ([#135]). - Configuration of terminationGracePeriodSeconds considered in systemd units ([#138]). - Systemd dependency adapted so that it is compatible with systemd version 241 ([#145]). + +[#1]: https://github.com/stackabletech/agent/pull/1 +[#18]: https://github.com/stackabletech/agent/pull/18 +[#23]: https://github.com/stackabletech/agent/pull/23 +[#25]: https://github.com/stackabletech/agent/pull/25 +[#26]: https://github.com/stackabletech/agent/pull/26 +[#30]: https://github.com/stackabletech/agent/pull/30 +[#32]: https://github.com/stackabletech/agent/pull/32 +[#35]: https://github.com/stackabletech/agent/pull/35 +[#36]: https://github.com/stackabletech/agent/pull/36 +[#40]: https://github.com/stackabletech/agent/pull/40 +[#43]: https://github.com/stackabletech/agent/pull/43 +[#50]: https://github.com/stackabletech/agent/pull/50 +[#53]: https://github.com/stackabletech/agent/pull/53 +[#56]: https://github.com/stackabletech/agent/pull/56 +[#57]: https://github.com/stackabletech/agent/pull/57 +[#63]: https://github.com/stackabletech/agent/pull/63 +[#72]: https://github.com/stackabletech/agent/pull/72 +[#73]: https://github.com/stackabletech/agent/pull/73 +[#77]: https://github.com/stackabletech/agent/pull/77 +[#78]: https://github.com/stackabletech/agent/pull/78 +[#79]: https://github.com/stackabletech/agent/pull/79 +[#94]: https://github.com/stackabletech/agent/pull/94 +[#100]: https://github.com/stackabletech/agent/pull/100 +[#109]: https://github.com/stackabletech/agent/pull/109 +[#110]: https://github.com/stackabletech/agent/pull/110 +[#135]: https://github.com/stackabletech/agent/pull/135 +[#138]: https://github.com/stackabletech/agent/pull/138 +[#144]: https://github.com/stackabletech/agent/pull/144 +[#145]: https://github.com/stackabletech/agent/pull/145 +[#152]: https://github.com/stackabletech/agent/pull/152 diff --git a/Cargo.lock b/Cargo.lock index d70cd27..79b9534 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2696,7 +2696,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "stackable-agent" -version = "0.2.0-nightly" +version = "0.1.1" dependencies = [ "Inflector", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 784c990..2df289a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "stackable-agent" description = "The component of the Stackable Platform that manages installation of services on the workers" -version = "0.2.0-nightly" +version = "0.1.1" authors = ["Sönke Liebau "] edition = "2018" license = "Apache-2.0" From 30736b01e5095d07ebe07ad92a48be902558d736 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 19 May 2021 14:20:18 +0200 Subject: [PATCH 4/9] Changelog converted to asciidoc --- CHANGELOG.adoc | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++ CHANGELOG.md | 58 -------------------------------------------------- 2 files changed, 58 insertions(+), 58 deletions(-) create mode 100644 CHANGELOG.adoc delete mode 100644 CHANGELOG.md diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc new file mode 100644 index 0000000..6b17cdb --- /dev/null +++ b/CHANGELOG.adoc @@ -0,0 +1,58 @@ += Changelog + +== 0.1.1 - 2021-05-20 + +:164: https://github.com/stackabletech/agent/pull/164[#164] + +=== Fixed +* Pod state synchronized with systemd service state ({164}). + +== 0.1.0 - 2021-05-17 + +:1: https://github.com/stackabletech/agent/pull/1[#1] +:18: https://github.com/stackabletech/agent/pull/18[#18] +:23: https://github.com/stackabletech/agent/pull/23[#23] +:25: https://github.com/stackabletech/agent/pull/25[#25] +:26: https://github.com/stackabletech/agent/pull/26[#26] +:30: https://github.com/stackabletech/agent/pull/30[#30] +:32: https://github.com/stackabletech/agent/pull/32[#32] +:35: https://github.com/stackabletech/agent/pull/35[#35] +:36: https://github.com/stackabletech/agent/pull/36[#36] +:40: https://github.com/stackabletech/agent/pull/40[#40] +:43: https://github.com/stackabletech/agent/pull/43[#43] +:50: https://github.com/stackabletech/agent/pull/50[#50] +:53: https://github.com/stackabletech/agent/pull/53[#53] +:56: https://github.com/stackabletech/agent/pull/56[#56] +:57: https://github.com/stackabletech/agent/pull/57[#57] +:63: https://github.com/stackabletech/agent/pull/63[#63] +:72: https://github.com/stackabletech/agent/pull/72[#72] +:73: https://github.com/stackabletech/agent/pull/73[#73] +:77: https://github.com/stackabletech/agent/pull/77[#77] +:78: https://github.com/stackabletech/agent/pull/78[#78] +:79: https://github.com/stackabletech/agent/pull/79[#79] +:94: https://github.com/stackabletech/agent/pull/94[#94] +:100: https://github.com/stackabletech/agent/pull/100[#100] +:109: https://github.com/stackabletech/agent/pull/109[#109] +:110: https://github.com/stackabletech/agent/pull/110[#110] +:135: https://github.com/stackabletech/agent/pull/135[#135] +:138: https://github.com/stackabletech/agent/pull/138[#138] +:144: https://github.com/stackabletech/agent/pull/144[#144] +:145: https://github.com/stackabletech/agent/pull/145[#145] +:152: https://github.com/stackabletech/agent/pull/152[#152] + +=== Added +* Apache license v2.0 set ({23}). +* Krustlet based agent implementation created ({1}, {18}, {26}, {35}, {40}). +* Functionality to stop and restart processes added ({25}). +* Agent restart without impacting running services enabled ({63}). +* Rendering of template variables to environment variables added ({30}). +* Setting of pod condition "ready" for state "running" added ({32}). +* Support for command line parameters added ({36}, {50}, {72}, {109}). +* Integration with systemd implemented ({43}, {53}, {100}, {152}). +* Dependabot and security audit enabled ({56}, {57}). +* Building and publishing of nightly deb and rpm packages added ({73}, {78}, {94}, {110}, {144}). +* Bootstrapping of certificates and kubeconfig added ({77}). +* Support for running of services as application users added ({79}). +* Retrieval of container logs with kubectl logs implemented ({135}). +* Configuration of terminationGracePeriodSeconds considered in systemd units ({138}). +* Systemd dependency adapted so that it is compatible with systemd version 241 ({145}). diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index 87c06c4..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,58 +0,0 @@ -# Changelog - -## 0.1.1 - 2021-05-20 - -### Fixed -- Pod state synchronized with systemd service state ([#164]). - -[#164]: https://github.com/stackabletech/agent/pull/164 - -## 0.1.0 - 2021-05-17 - -### Added -- Apache license v2.0 set ([#23]). -- Krustlet based agent implementation created ([#1], [#18], [#26], [#35], [#40]). -- Functionality to stop and restart processes added ([#25]). -- Agent restart without impacting running services enabled ([#63]). -- Rendering of template variables to environment variables added ([#30]). -- Setting of pod condition "ready" for state "running" added ([#32]). -- Support for command line parameters added ([#36], [#50], [#72], [#109]). -- Integration with systemd implemented ([#43], [#53], [#100], [#152]). -- Dependabot and security audit enabled ([#56], [#57]). -- Building and publishing of nightly deb and rpm packages added ([#73], [#78], [#94], [#110], [#144]). -- Bootstrapping of certificates and kubeconfig added ([#77]). -- Support for running of services as application users added ([#79]). -- Retrieval of container logs with kubectl logs implemented ([#135]). -- Configuration of terminationGracePeriodSeconds considered in systemd units ([#138]). -- Systemd dependency adapted so that it is compatible with systemd version 241 ([#145]). - -[#1]: https://github.com/stackabletech/agent/pull/1 -[#18]: https://github.com/stackabletech/agent/pull/18 -[#23]: https://github.com/stackabletech/agent/pull/23 -[#25]: https://github.com/stackabletech/agent/pull/25 -[#26]: https://github.com/stackabletech/agent/pull/26 -[#30]: https://github.com/stackabletech/agent/pull/30 -[#32]: https://github.com/stackabletech/agent/pull/32 -[#35]: https://github.com/stackabletech/agent/pull/35 -[#36]: https://github.com/stackabletech/agent/pull/36 -[#40]: https://github.com/stackabletech/agent/pull/40 -[#43]: https://github.com/stackabletech/agent/pull/43 -[#50]: https://github.com/stackabletech/agent/pull/50 -[#53]: https://github.com/stackabletech/agent/pull/53 -[#56]: https://github.com/stackabletech/agent/pull/56 -[#57]: https://github.com/stackabletech/agent/pull/57 -[#63]: https://github.com/stackabletech/agent/pull/63 -[#72]: https://github.com/stackabletech/agent/pull/72 -[#73]: https://github.com/stackabletech/agent/pull/73 -[#77]: https://github.com/stackabletech/agent/pull/77 -[#78]: https://github.com/stackabletech/agent/pull/78 -[#79]: https://github.com/stackabletech/agent/pull/79 -[#94]: https://github.com/stackabletech/agent/pull/94 -[#100]: https://github.com/stackabletech/agent/pull/100 -[#109]: https://github.com/stackabletech/agent/pull/109 -[#110]: https://github.com/stackabletech/agent/pull/110 -[#135]: https://github.com/stackabletech/agent/pull/135 -[#138]: https://github.com/stackabletech/agent/pull/138 -[#144]: https://github.com/stackabletech/agent/pull/144 -[#145]: https://github.com/stackabletech/agent/pull/145 -[#152]: https://github.com/stackabletech/agent/pull/152 From 9d3602c8b22d514f89fd9c98258651a15d4229c7 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 19 May 2021 16:29:21 +0200 Subject: [PATCH 5/9] Repetitive code replaced with macros --- src/provider/systemdmanager/systemd1_api.rs | 160 +++++++++----------- 1 file changed, 73 insertions(+), 87 deletions(-) diff --git a/src/provider/systemdmanager/systemd1_api.rs b/src/provider/systemdmanager/systemd1_api.rs index 401b3ce..c2154a7 100644 --- a/src/provider/systemdmanager/systemd1_api.rs +++ b/src/provider/systemdmanager/systemd1_api.rs @@ -11,46 +11,85 @@ use strum::{AsRefStr, Display, EnumString, EnumVariantNames, IntoStaticStr, Vari use zbus::dbus_proxy; use zvariant::{derive::Type, OwnedObjectPath, OwnedValue, Signature, Type}; -/// Type of an entry in a changes list -#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)] -#[strum(serialize_all = "kebab-case")] -pub enum ChangeType { - Symlink, - Unlink, +/// Implements [`Serialize`] for an enum. +/// +/// The variants are serialized to strings in kebab-case. +/// The enum must be annotated with `#[derive(AsRefStr)]`. +macro_rules! impl_serialize_for_enum { + ($t:ty) => { + impl Serialize for $t { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&kebabcase::to_kebab_case(self.as_ref())) + } + } + }; } -impl<'de> Deserialize<'de> for ChangeType { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct VariantVisitor; +/// Implements [`Deserialize`] for an enum. +/// +/// The variants are deserialized from strings in kebab-case. +/// The enum must be annotated with the following attributes: +/// ``` +/// #[derive(EnumString, EnumVariantNames)] +/// #[strum(serialize_all = "kebab-case")] +/// ``` +macro_rules! impl_deserialize_for_enum { + ($t:ty) => { + impl<'de> Deserialize<'de> for $t { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct VariantVisitor; - impl<'de> Visitor<'de> for VariantVisitor { - type Value = ChangeType; + impl<'de> Visitor<'de> for VariantVisitor { + type Value = $t; - fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { - write!(formatter, "Expecting one of {:?}", Self::Value::VARIANTS) - } + fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "Expecting one of {:?}", Self::Value::VARIANTS) + } - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - FromStr::from_str(v).map_err(|_| E::unknown_variant(v, Self::Value::VARIANTS)) + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + FromStr::from_str(v) + .map_err(|_| E::unknown_variant(v, Self::Value::VARIANTS)) + } + } + + deserializer.deserialize_str(VariantVisitor) } } + }; +} - deserializer.deserialize_str(VariantVisitor) - } +/// Implements [`Type`] for an enum which is serialized from or +/// deserialized to a string. +macro_rules! impl_type_for_enum { + ($t:ty) => { + impl Type for $t { + fn signature() -> Signature<'static> { + String::signature() + } + } + }; } -impl Type for ChangeType { - fn signature() -> Signature<'static> { - String::signature() - } +/// Type of an entry in a changes list +#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)] +#[strum(serialize_all = "kebab-case")] +pub enum ChangeType { + Symlink, + Unlink, } +impl_deserialize_for_enum!(ChangeType); +impl_type_for_enum!(ChangeType); + /// Entry of a changes list #[derive(Debug, Type, Deserialize)] pub struct Change { @@ -91,20 +130,8 @@ pub enum StartMode { IgnoreRequirements, } -impl Serialize for StartMode { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&kebabcase::to_kebab_case(self.as_ref())) - } -} - -impl Type for StartMode { - fn signature() -> Signature<'static> { - String::signature() - } -} +impl_serialize_for_enum!(StartMode); +impl_type_for_enum!(StartMode); /// Mode in which a unit will be stopped #[derive(Debug, Display, AsRefStr)] @@ -131,20 +158,8 @@ pub enum StopMode { IgnoreRequirements, } -impl Serialize for StopMode { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&kebabcase::to_kebab_case(self.as_ref())) - } -} - -impl Type for StopMode { - fn signature() -> Signature<'static> { - String::signature() - } -} +impl_serialize_for_enum!(StopMode); +impl_type_for_enum!(StopMode); /// The manager object is the central entry point for clients. /// @@ -286,37 +301,8 @@ pub enum JobRemovedResult { Skipped, } -impl<'de> Deserialize<'de> for JobRemovedResult { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct VariantVisitor; - - impl<'de> Visitor<'de> for VariantVisitor { - type Value = JobRemovedResult; - - fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { - write!(formatter, "Expecting one of {:?}", Self::Value::VARIANTS) - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - FromStr::from_str(v).map_err(|_| E::unknown_variant(v, Self::Value::VARIANTS)) - } - } - - deserializer.deserialize_str(VariantVisitor) - } -} - -impl Type for JobRemovedResult { - fn signature() -> Signature<'static> { - String::signature() - } -} +impl_deserialize_for_enum!(JobRemovedResult); +impl_type_for_enum!(JobRemovedResult); /// Message body of [`ManagerSignals::JobRemoved`] #[derive(Debug, Deserialize, Type)] From 45c442fbd8ebf00433a80490e406feeb32de85ee Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 19 May 2021 16:46:55 +0200 Subject: [PATCH 6/9] Periods removed from log output --- src/provider/systemdmanager/manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 5333bf3..0f79420 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -278,7 +278,7 @@ impl SystemdManager { /// systemd at the time this is called. /// To make a service known please take a look at the [`SystemdManager::enable`] function. pub async fn start(&self, unit: &str) -> anyhow::Result<()> { - debug!("Trying to start unit [{}].", unit); + debug!("Trying to start unit [{}]", unit); let result = self .process_job(|proxy| proxy.start_unit(unit, StartMode::Fail)) @@ -296,7 +296,7 @@ impl SystemdManager { /// systemd at the time this is called. /// To make a service known please take a look at the [`SystemdManager::enable`] function. pub async fn stop(&self, unit: &str) -> anyhow::Result<()> { - debug!("Trying to stop systemd unit [{}].", unit); + debug!("Trying to stop systemd unit [{}]", unit); let result = self .process_job(|proxy| proxy.stop_unit(unit, StopMode::Fail)) From 81523fe14aed89733633fd21f0ee12b4f2b1bed7 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Thu, 20 May 2021 10:22:34 +0200 Subject: [PATCH 7/9] Code commented --- src/provider/systemdmanager/manager.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 0f79420..7900a89 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -316,6 +316,7 @@ impl SystemdManager { F: Fn(&'a AsyncManagerProxy) -> Fut, Fut: Future>>, { + // Listen for `JobRemoved` signals. let signals = self .proxy .receive_signal(ManagerSignals::JobRemoved.into()) @@ -324,10 +325,18 @@ impl SystemdManager { let job = task(&self.proxy).await?; - let signal = signals - .filter(|signal| future::ready(&signal.job.to_owned().into_inner() == job.path())) - .next() - .await; + // Narrow signal stream down to the corresponding job. + let mut signals = signals + .filter(|signal| future::ready(&signal.job.to_owned().into_inner() == job.path())); + + // Await the `JobRemoved` signal. + let signal = signals.next().await; + + // Unsubscribe from `JobRemoved` signals. + // If `signals` goes out of scope prematurely due to an error + // then the subscription is cancelled synchronously in the + // destructor of `SignalStream`. + let _ = signals.into_inner().into_inner().close().await; match signal { Some(message) if message.result == JobRemovedResult::Done => Ok(()), From 411ae92fba4749742f41e26ebb872c1a8dacf270 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Thu, 20 May 2021 14:59:48 +0200 Subject: [PATCH 8/9] Documentation improved --- src/provider/systemdmanager/manager.rs | 29 +++++++++++++++++--------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 7900a89..8dbf7bf 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -281,7 +281,7 @@ impl SystemdManager { debug!("Trying to start unit [{}]", unit); let result = self - .process_job(|proxy| proxy.start_unit(unit, StartMode::Fail)) + .call_method(|proxy| proxy.start_unit(unit, StartMode::Fail)) .await; if result.is_ok() { @@ -299,7 +299,7 @@ impl SystemdManager { debug!("Trying to stop systemd unit [{}]", unit); let result = self - .process_job(|proxy| proxy.stop_unit(unit, StopMode::Fail)) + .call_method(|proxy| proxy.stop_unit(unit, StopMode::Fail)) .await; if result.is_ok() { @@ -309,30 +309,39 @@ impl SystemdManager { result.map_err(|e| anyhow!("Error stopping service [{}]: {}", unit, e)) } - /// Runs the given task and waits until the job returned by the task - /// is finished. - async fn process_job<'a, F, Fut>(&'a self, task: F) -> anyhow::Result<()> + /// Calls a systemd method and waits until the dependent job is + /// finished. + /// + /// The given method enqueues a job in systemd and returns the job + /// object. Systemd sends out a `JobRemoved` signal when the job is + /// dequeued. The signal contains the reason for the dequeuing like + /// `"done"`, `"failed"`, or `"canceled"`. + /// + /// This function subscribes to `JobRemoved` signals, calls the + /// given method, awaits the signal for the corresponding job, and + /// returns `Ok(())` if the result is [`JobRemovedResult::Done`]. + /// If the signal contains another result or no signal is returned + /// (which should never happen) then an error with a corresponding + /// message is returned. + async fn call_method<'a, F, Fut>(&'a self, method: F) -> anyhow::Result<()> where F: Fn(&'a AsyncManagerProxy) -> Fut, Fut: Future>>, { - // Listen for `JobRemoved` signals. let signals = self .proxy .receive_signal(ManagerSignals::JobRemoved.into()) .await? .map(|message| message.body::().unwrap()); - let job = task(&self.proxy).await?; + let job = method(&self.proxy).await?; - // Narrow signal stream down to the corresponding job. let mut signals = signals .filter(|signal| future::ready(&signal.job.to_owned().into_inner() == job.path())); - // Await the `JobRemoved` signal. let signal = signals.next().await; - // Unsubscribe from `JobRemoved` signals. + // Unsubscribe from receiving signals. // If `signals` goes out of scope prematurely due to an error // then the subscription is cancelled synchronously in the // destructor of `SignalStream`. From ba7cbf366537c850114a40390081c7e7a4cc26d6 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Thu, 20 May 2021 15:24:01 +0200 Subject: [PATCH 9/9] Version set to 0.2.0 and changelog updated --- CHANGELOG.adoc | 6 +++++- Cargo.lock | 2 +- Cargo.toml | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 6b17cdb..df75284 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -1,9 +1,13 @@ = Changelog -== 0.1.1 - 2021-05-20 +== 0.2.0 - 2021-05-20 +:159: https://github.com/stackabletech/agent/pull/159[#159] :164: https://github.com/stackabletech/agent/pull/164[#164] +=== Added +* Templating facility added to the `config-directory` parameter ({159}). + === Fixed * Pod state synchronized with systemd service state ({164}). diff --git a/Cargo.lock b/Cargo.lock index 79b9534..11d1f69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2696,7 +2696,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "stackable-agent" -version = "0.1.1" +version = "0.2.0" dependencies = [ "Inflector", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 2df289a..b0fbbdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "stackable-agent" description = "The component of the Stackable Platform that manages installation of services on the workers" -version = "0.1.1" +version = "0.2.0" authors = ["Sönke Liebau "] edition = "2018" license = "Apache-2.0"