From 1b708665050a69d6a4ef0afd4ce7a0a0b2d9cd75 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Fri, 9 Apr 2021 17:19:52 +0200 Subject: [PATCH 01/12] Logs forwarded from systemd journal (wip) --- Cargo.lock | 80 ++++++++++++++++++- Cargo.toml | 3 +- src/provider/mod.rs | 55 ++++++++++--- src/provider/states/pod/starting.rs | 41 ++++++++-- src/provider/systemdmanager/journal_reader.rs | 50 ++++++++++++ src/provider/systemdmanager/mod.rs | 1 + 6 files changed, 211 insertions(+), 19 deletions(-) create mode 100644 src/provider/systemdmanager/journal_reader.rs diff --git a/Cargo.lock b/Cargo.lock index bdd62f7..9f7e7ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,6 +221,12 @@ dependencies = [ "safemem", ] +[[package]] +name = "build-env" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cf89846ef2b2674ef1c153256cec98fba587c72bf4ea2c4b2f6d91a19f55926" + [[package]] name = "bumpalo" version = "3.6.1" @@ -373,6 +379,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "cstr-argument" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20bd4e8067c20c7c3a4dea759ef91d4b18418ddb5bd8837ef6e2f2f93ca7ccbb" +dependencies = [ + "cfg-if 0.1.10", + "memchr", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -576,7 +592,28 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" dependencies = [ - "foreign-types-shared", + "foreign-types-shared 0.1.1", +] + +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared 0.3.0", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63f713f8b2aa9e24fec85b0e290c56caee12e3b6ae0aeeda238a75b28251afd6" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -585,6 +622,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "foreign-types-shared" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7684cf33bb7f28497939e8c7cf17e3e4e3b8d9a0080ffa4f8ae2f515442ee855" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1331,6 +1374,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libsystemd-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e03fd580bcecda68dcdcd5297085ade6a3dc552cd8b030d2b94a9b089ef7ab8" +dependencies = [ + "build-env", + "libc", + "pkg-config", +] + [[package]] name = "linked-hash-map" version = "0.5.4" @@ -1627,7 +1681,7 @@ checksum = "a61075b62a23fef5a29815de7536d940aa35ce96d18ce0cc5076272db678a577" dependencies = [ "bitflags", "cfg-if 1.0.0", - "foreign-types", + "foreign-types 0.3.2", "libc", "once_cell", "openssl-sys", @@ -2582,6 +2636,7 @@ dependencies = [ "stackable_config", "strum", "strum_macros", + "systemd", "tar", "thiserror", "tokio 1.5.0", @@ -2724,6 +2779,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "systemd" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f722cabda922e471742300045f56dbaa53fafbb4520fca304e51258019bfe91d" +dependencies = [ + "cstr-argument", + "foreign-types 0.5.0", + "libc", + "libsystemd-sys", + "log", + "memchr", + "utf8-cstr", +] + [[package]] name = "tar" version = "0.4.33" @@ -3276,6 +3346,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" +[[package]] +name = "utf8-cstr" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55bcbb425141152b10d5693095950b51c3745d019363fc2929ffd8f61449b628" + [[package]] name = "uuid" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 08dad1e..e51cbe5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ shellexpand = "2.1" stackable_config = { git = "https://github.com/stackabletech/common.git", branch = "main" } strum = { version = "0.20", features = ["derive"] } strum_macros = "0.20" +systemd = "0.8" tar = "0.4" thiserror = "1.0" tokio = { version = "1.5", features = ["macros", "rt-multi-thread", "time"] } @@ -56,4 +57,4 @@ systemd-units = { enable = false } assets = [ ["packaging/config/agent.conf", "etc/stackable/stackable-agent/", "644"], ["target/release/stackable-agent", "opt/stackable-agent/stackable-agent", "755"], -] \ No newline at end of file +] diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 4f6589e..94a4ede 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -7,12 +7,12 @@ use anyhow::anyhow; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; use kube::{Api, Client}; use kubelet::backoff::ExponentialBackoffStrategy; -use kubelet::log::Sender; +use kubelet::log::{HandleFactory, Sender}; use kubelet::node::Builder; use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; -use kubelet::provider::Provider; -use log::{debug, error}; +use kubelet::pod::{Handle, Pod, PodKey}; +use kubelet::{handle::StopHandler, provider::Provider}; +use log::{debug, error, info}; use tokio::sync::RwLock; use crate::provider::error::StackableError; @@ -25,7 +25,8 @@ use crate::provider::states::pod::terminated::Terminated; use crate::provider::states::pod::PodState; use crate::provider::systemdmanager::manager::SystemdManager; use kube::error::ErrorResponse; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; +use systemdmanager::journal_reader::JournalReader; pub struct StackableProvider { shared: ProviderState, @@ -42,9 +43,37 @@ mod repository; mod states; mod systemdmanager; +pub struct Runtime { + service_unit: String, +} + +#[async_trait::async_trait] +impl StopHandler for Runtime { + async fn stop(&mut self) -> anyhow::Result<()> { + Ok(()) + } + + async fn wait(&mut self) -> anyhow::Result<()> { + Ok(()) + } +} + +pub struct HF { + journal_reader: JournalReader, +} + +impl HandleFactory for HF { + fn new_handle(&self) -> JournalReader { + self.journal_reader.clone() + } +} + +type PodHandleMap = Arc>>>>; + /// Provider-level state shared between all pods #[derive(Clone)] pub struct ProviderState { + handles: PodHandleMap, client: Client, systemd_manager: Arc, } @@ -61,6 +90,7 @@ impl StackableProvider { let systemd_manager = Arc::new(SystemdManager::new(session, Duration::from_secs(5))?); let provider_state = ProviderState { + handles: Default::default(), client, systemd_manager, }; @@ -193,12 +223,17 @@ impl Provider for StackableProvider { async fn logs( &self, - _namespace: String, - _pod: String, - _container: String, - _sender: Sender, + namespace: String, + pod: String, + container: String, + sender: Sender, ) -> anyhow::Result<()> { - Ok(()) + info!("Logs requested"); + let mut handles = self.shared.handles.write().await; + let handle = handles + .get_mut(&PodKey::new(&namespace, &pod)) + .ok_or_else(|| anyhow!("Pod [{:?}] not found", pod))?; + handle.output(&container, sender).await } } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 5006528..1ec995e 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -1,13 +1,16 @@ use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; +use kubelet::{ + container::{ContainerKey, Handle as ContainerHandle}, + pod::{Handle as PodHandle, Pod, PodKey}, +}; use super::failed::Failed; use super::running::Running; use super::setup_failed::SetupFailed; -use crate::provider::{PodState, ProviderState}; +use crate::provider::{JournalReader, PodState, ProviderState, Runtime, HF}; use anyhow::anyhow; use log::{debug, error, info, warn}; -use std::time::Instant; +use std::{collections::HashMap, sync::Arc, time::Instant}; use tokio::time::Duration; #[derive(Default, Debug, TransitionTo)] @@ -18,12 +21,12 @@ pub struct Starting; impl State for Starting { async fn next( self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, - _: Manifest, + pod: Manifest, ) -> Transition { let systemd_manager = { - let provider_state = provider_state.read().await; + let provider_state = shared.read().await; provider_state.systemd_manager.clone() }; @@ -71,6 +74,32 @@ impl State for Starting { return Transition::Complete(Err(enable_error)); } + info!("Creating container handle"); + { + let pod = pod.latest(); + let provider_state = shared.write().await; + let pod_key = PodKey::from(pod.clone()); + info!("Pod [{:?}] inserted into handles", pod_key); + let mut handles_writer = provider_state.handles.write().await; + let pod_handle = handles_writer.entry(pod_key).or_insert_with(|| { + Arc::new(PodHandle::new(HashMap::new(), pod.clone(), None)) + }); + let container_handle = ContainerHandle::new( + Runtime { + service_unit: unit.get_name(), + }, + HF { + journal_reader: JournalReader { end: false }, + }, + ); + pod_handle + .insert_container_handle( + ContainerKey::App(String::from("test-service")), + container_handle, + ) + .await; + } + let start_time = Instant::now(); // TODO: does this need to be configurable, or ar we happy with a hard coded value // for now. I've briefly looked at the podspec and couldn't identify a good field diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs new file mode 100644 index 0000000..e98a434 --- /dev/null +++ b/src/provider/systemdmanager/journal_reader.rs @@ -0,0 +1,50 @@ +use log::info; +use std::{ + io::{Result, SeekFrom}, + pin::Pin, + task::{Context, Poll}, +}; +use systemd::journal; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; + +#[derive(Clone)] +pub struct JournalReader { + pub end: bool, +} + +impl AsyncRead for JournalReader { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if !self.end { + self.end = true; + info!("Put Hallo into the buffer"); + let mut journal = journal::OpenOptions::default() + .open() + .expect("Journal could not be opened"); + if let Ok(journal) = journal.match_add( + "_SYSTEMD_USER_UNIT", + "default-agent-integration-test-test-service.service", + ) { + if let Ok(Some(entry)) = journal.next_entry() { + buf.put_slice(format!("{:?}", entry).as_bytes()); + } + } + } + Poll::Ready(Ok(())) + } +} + +impl AsyncSeek for JournalReader { + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { + info!("Seek to {:?}", position); + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + info!("Poll complete"); + Poll::Ready(Ok(0)) + } +} diff --git a/src/provider/systemdmanager/mod.rs b/src/provider/systemdmanager/mod.rs index d87d4b5..8c2df7b 100644 --- a/src/provider/systemdmanager/mod.rs +++ b/src/provider/systemdmanager/mod.rs @@ -1,2 +1,3 @@ +pub mod journal_reader; pub mod manager; pub mod systemdunit; From 66da356446220f8118fe5de87a54cff933961eaf Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 12 Apr 2021 14:09:12 +0200 Subject: [PATCH 02/12] Journal entries filtered by invocation ID --- src/provider/states/pod/starting.rs | 61 +++++++++++-------- src/provider/systemdmanager/journal_reader.rs | 9 ++- src/provider/systemdmanager/manager.rs | 26 +++++--- 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 1ec995e..1678eaf 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -74,32 +74,6 @@ impl State for Starting { return Transition::Complete(Err(enable_error)); } - info!("Creating container handle"); - { - let pod = pod.latest(); - let provider_state = shared.write().await; - let pod_key = PodKey::from(pod.clone()); - info!("Pod [{:?}] inserted into handles", pod_key); - let mut handles_writer = provider_state.handles.write().await; - let pod_handle = handles_writer.entry(pod_key).or_insert_with(|| { - Arc::new(PodHandle::new(HashMap::new(), pod.clone(), None)) - }); - let container_handle = ContainerHandle::new( - Runtime { - service_unit: unit.get_name(), - }, - HF { - journal_reader: JournalReader { end: false }, - }, - ); - pod_handle - .insert_container_handle( - ContainerKey::App(String::from("test-service")), - container_handle, - ) - .await; - } - let start_time = Instant::now(); // TODO: does this need to be configurable, or ar we happy with a hard coded value // for now. I've briefly looked at the podspec and couldn't identify a good field @@ -130,6 +104,41 @@ impl State for Starting { Err(dbus_error) => return Transition::Complete(Err(dbus_error)), } } + + info!("Creating container handle"); + { + let pod = pod.latest(); + let provider_state = shared.write().await; + let pod_key = PodKey::from(pod.clone()); + info!("Pod [{:?}] inserted into handles", pod_key); + let mut handles_writer = provider_state.handles.write().await; + let pod_handle = handles_writer.entry(pod_key).or_insert_with(|| { + Arc::new(PodHandle::new(HashMap::new(), pod.clone(), None)) + }); + match systemd_manager.get_invocation_id(&unit.get_name()) { + Ok(invocation_id) => { + info!("InvocationID: {}", invocation_id); + let container_handle = ContainerHandle::new( + Runtime { + service_unit: unit.get_name(), + }, + HF { + journal_reader: JournalReader { + invocation_id, + end: false, + }, + }, + ); + pod_handle + .insert_container_handle( + ContainerKey::App(String::from("test-service")), + container_handle, + ) + .await; + } + Err(dbus_error) => return Transition::Complete(Err(dbus_error)), + } + } } } else { warn!( diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index e98a434..ff0fae7 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -9,6 +9,7 @@ use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; #[derive(Clone)] pub struct JournalReader { + pub invocation_id: String, pub end: bool, } @@ -20,14 +21,12 @@ impl AsyncRead for JournalReader { ) -> Poll> { if !self.end { self.end = true; - info!("Put Hallo into the buffer"); let mut journal = journal::OpenOptions::default() .open() .expect("Journal could not be opened"); - if let Ok(journal) = journal.match_add( - "_SYSTEMD_USER_UNIT", - "default-agent-integration-test-test-service.service", - ) { + if let Ok(journal) = + journal.match_add("_SYSTEMD_INVOCATION_ID", self.invocation_id.clone()) + { if let Ok(Some(entry)) = journal.next_entry() { buf.put_slice(format!("{:?}", entry).as_bytes()); } diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 1c55a81..ba08ec2 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -7,7 +7,7 @@ use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::StackableError; use crate::provider::StackableError::RuntimeError; use anyhow::anyhow; -use dbus::arg::{AppendAll, ReadAll, Variant}; +use dbus::arg::{AppendAll, Get, ReadAll, RefArg, Variant}; use dbus::blocking::SyncConnection; use dbus::strings::Member; use dbus::Path; @@ -374,6 +374,20 @@ impl SystemdManager { } pub fn is_running(&self, unit: &str) -> Result { + self.get_value::(unit, "ActiveState") + .map(|v| v.as_str() == Some("active")) + } + + pub fn get_invocation_id(&self, unit: &str) -> Result { + self.get_value::>(unit, "InvocationID") + .map(|Variant(vec)| vec.iter().map(|byte| format!("{:02x}", byte)).collect()) + } + + pub fn get_value Get<'a>>( + &self, + unit: &str, + property: &str, + ) -> Result, anyhow::Error> { // We are using `LoadUnit` here, as GetUnit can fail seemingly at random, when the unit // is not loaded due to systemd garbage collection. // see https://github.com/systemd/systemd/issues/1929 for more information @@ -385,17 +399,13 @@ impl SystemdManager { .connection .with_proxy(SYSTEMD_DESTINATION, &unit_node, self.timeout); - let active_state = proxy + Ok(proxy .method_call( DBUS_PROPERTIES_INTERFACE, "Get", - ("org.freedesktop.systemd1.Unit", "ActiveState"), + ("org.freedesktop.systemd1.Unit", property), ) - .map(|r: (Variant,)| r.0)?; - - // TODO: I think this can panic, there should be a get() method on Variant that returns - // an option, but I've not yet been able to get that to work - Ok(active_state.0 == "active") + .map(|r: (Variant,)| r.0)?) } // Symlink a unit file into the systemd unit folder From 976966a0e60dcec367e3466b604dd4b6c72a65e5 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 13 Apr 2021 16:11:07 +0200 Subject: [PATCH 03/12] All requested journal entries are returned --- src/provider/states/pod/starting.rs | 5 +- src/provider/systemdmanager/journal_reader.rs | 95 +++++++++++++++++-- 2 files changed, 87 insertions(+), 13 deletions(-) diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 1678eaf..5738ff4 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -123,10 +123,7 @@ impl State for Starting { service_unit: unit.get_name(), }, HF { - journal_reader: JournalReader { - invocation_id, - end: false, - }, + journal_reader: JournalReader::new(&invocation_id) }, ); pod_handle diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index ff0fae7..df7d762 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -1,5 +1,6 @@ use log::info; use std::{ + cmp, io::{Result, SeekFrom}, pin::Pin, task::{Context, Poll}, @@ -9,8 +10,19 @@ use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; #[derive(Clone)] pub struct JournalReader { - pub invocation_id: String, - pub end: bool, + invocation_id: String, + cursor: Option, + buffer: Vec, +} + +impl JournalReader { + pub fn new(invocation_id: &str) -> JournalReader { + JournalReader { + invocation_id: String::from(invocation_id), + cursor: None, + buffer: Vec::new(), + } + } } impl AsyncRead for JournalReader { @@ -19,19 +31,84 @@ impl AsyncRead for JournalReader { _cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - if !self.end { - self.end = true; + info!("poll_read"); + info!("buf remaining: [{}]", buf.remaining()); + + if !self.buffer.is_empty() && buf.remaining() != 0 { + let len = cmp::min(self.buffer.len(), buf.remaining()); + let data = self.buffer.drain(0..len).collect::>(); + info!("Write buffer with length [{}] to buf", len); + buf.put_slice(&data); + } + + // TODO unexpect and unpanic code + if buf.remaining() != 0 { + info!("Write journal entries to buf"); let mut journal = journal::OpenOptions::default() .open() .expect("Journal could not be opened"); - if let Ok(journal) = - journal.match_add("_SYSTEMD_INVOCATION_ID", self.invocation_id.clone()) - { - if let Ok(Some(entry)) = journal.next_entry() { - buf.put_slice(format!("{:?}", entry).as_bytes()); + let journal = journal + .match_add("_SYSTEMD_INVOCATION_ID", self.invocation_id.clone()) + .expect("Journal could not be applied"); + + info!("Set threshold"); + // TODO Define a good default + journal + .set_data_threshold(1000) + .expect("Cannot set threshold"); + + info!("Set cursor"); + match &self.cursor { + None => journal.seek_head().expect("Seek head failed"), + Some(cursor) => { + journal.seek_cursor(cursor).expect("Seek cursor failed"); + journal.next().expect("Could not get next journal entry."); + } + }; + + let mut eof = false; + while buf.remaining() != 0 && !eof { + info!("Get next journal entry"); + if journal.next().expect("Could not get next journal entry.") != 0 { + info!("Get journal data"); + match journal + .get_data("MESSAGE") + .expect("Data could not be retrieved") + { + Some(message) => { + info!("Write message [{:?}] to buf", message); + if let Some(value) = message.value() { + let mut data = value.to_vec(); + // TODO explain number + data.push(10); + let len = cmp::min(data.len(), buf.remaining()); + let data_to_buffer = data.split_off(len); + info!("Write data with length [{}] to buf", data.len()); + info!( + "Write data with length [{}] to buffer", + data_to_buffer.len() + ); + self.buffer = data_to_buffer; + buf.put_slice(&data); + } + } + None => { + info!("no message content; eof reached"); + eof = true; + } + } + } else { + info!("eof reached"); + eof = true; } } + + info!("Update cursor"); + self.cursor = Some(journal.cursor().expect("Cannot retrieve cursor")); + info!("New cursor: [{:?}]", self.cursor); } + + info!("Signal poll ready"); Poll::Ready(Ok(())) } } From ebc60bd74b15584f2b1bf12d5355981ac7a04b42 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Thu, 15 Apr 2021 17:24:37 +0200 Subject: [PATCH 04/12] Journal reader implemented without kubelet --- src/provider/mod.rs | 68 +++---- src/provider/states/pod/starting.rs | 34 ++-- src/provider/systemdmanager/journal_reader.rs | 181 +++++++----------- src/provider/systemdmanager/systemdunit.rs | 4 + 4 files changed, 123 insertions(+), 164 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 94a4ede..07ee25e 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -7,13 +7,16 @@ use anyhow::anyhow; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; use kube::{Api, Client}; use kubelet::backoff::ExponentialBackoffStrategy; -use kubelet::log::{HandleFactory, Sender}; +use kubelet::log::Sender; use kubelet::node::Builder; use kubelet::pod::state::prelude::*; -use kubelet::pod::{Handle, Pod, PodKey}; -use kubelet::{handle::StopHandler, provider::Provider}; +use kubelet::pod::{Pod, PodKey}; +use kubelet::{ + container::{ContainerKey, ContainerMap}, + provider::Provider, +}; use log::{debug, error, info}; -use tokio::sync::RwLock; +use tokio::{runtime::Runtime, sync::RwLock, task}; use crate::provider::error::StackableError; use crate::provider::error::StackableError::{ @@ -26,7 +29,7 @@ use crate::provider::states::pod::PodState; use crate::provider::systemdmanager::manager::SystemdManager; use kube::error::ErrorResponse; use std::{collections::HashMap, time::Duration}; -use systemdmanager::journal_reader::JournalReader; +use systemdmanager::journal_reader; pub struct StackableProvider { shared: ProviderState, @@ -43,37 +46,20 @@ mod repository; mod states; mod systemdmanager; -pub struct Runtime { - service_unit: String, -} - -#[async_trait::async_trait] -impl StopHandler for Runtime { - async fn stop(&mut self) -> anyhow::Result<()> { - Ok(()) - } - - async fn wait(&mut self) -> anyhow::Result<()> { - Ok(()) - } +pub struct ContainerHandle { + invocation_id: String, } -pub struct HF { - journal_reader: JournalReader, +pub struct PodHandle { + containers: ContainerMap, } -impl HandleFactory for HF { - fn new_handle(&self) -> JournalReader { - self.journal_reader.clone() - } -} - -type PodHandleMap = Arc>>>>; +type PodHandleMap = HashMap; /// Provider-level state shared between all pods #[derive(Clone)] pub struct ProviderState { - handles: PodHandleMap, + handles: Arc>, client: Client, systemd_manager: Arc, } @@ -226,14 +212,30 @@ impl Provider for StackableProvider { namespace: String, pod: String, container: String, - sender: Sender, + mut sender: Sender, ) -> anyhow::Result<()> { info!("Logs requested"); - let mut handles = self.shared.handles.write().await; - let handle = handles - .get_mut(&PodKey::new(&namespace, &pod)) + let handles = self.shared.handles.write().await; + let pod_handle = handles + .get(&PodKey::new(&namespace, &pod)) .ok_or_else(|| anyhow!("Pod [{:?}] not found", pod))?; - handle.output(&container, sender).await + let container_handle = pod_handle + .containers + .get(&ContainerKey::App(container.clone())) + .ok_or_else(|| anyhow!("Container [{:?}] not found", container))?; + let invocation_id = container_handle.invocation_id.clone(); + + task::spawn_blocking(move || { + Runtime::new() + .unwrap() + .block_on(journal_reader::send_journal_entries( + &mut sender, + &invocation_id, + )) + .unwrap(); + }); + + Ok(()) } } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 5738ff4..ff2edf1 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -1,16 +1,16 @@ use kubelet::pod::state::prelude::*; use kubelet::{ - container::{ContainerKey, Handle as ContainerHandle}, - pod::{Handle as PodHandle, Pod, PodKey}, + container::ContainerKey, + pod::{Pod, PodKey}, }; use super::failed::Failed; use super::running::Running; use super::setup_failed::SetupFailed; -use crate::provider::{JournalReader, PodState, ProviderState, Runtime, HF}; +use crate::provider::{ContainerHandle, PodHandle, PodState, ProviderState}; use anyhow::anyhow; use log::{debug, error, info, warn}; -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{collections::HashMap, time::Instant}; use tokio::time::Duration; #[derive(Default, Debug, TransitionTo)] @@ -112,26 +112,20 @@ impl State for Starting { let pod_key = PodKey::from(pod.clone()); info!("Pod [{:?}] inserted into handles", pod_key); let mut handles_writer = provider_state.handles.write().await; - let pod_handle = handles_writer.entry(pod_key).or_insert_with(|| { - Arc::new(PodHandle::new(HashMap::new(), pod.clone(), None)) + let pod_handle = handles_writer.entry(pod_key).or_insert_with(|| PodHandle { + containers: HashMap::new(), }); match systemd_manager.get_invocation_id(&unit.get_name()) { Ok(invocation_id) => { - info!("InvocationID: {}", invocation_id); - let container_handle = ContainerHandle::new( - Runtime { - service_unit: unit.get_name(), - }, - HF { - journal_reader: JournalReader::new(&invocation_id) - }, + pod_handle.containers.insert( + ContainerKey::App( + unit.container_name + .as_ref() + .expect("Container name is missing") + .clone(), + ), + ContainerHandle { invocation_id }, ); - pod_handle - .insert_container_handle( - ContainerKey::App(String::from("test-service")), - container_handle, - ) - .await; } Err(dbus_error) => return Transition::Complete(Err(dbus_error)), } diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index df7d762..403359e 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -1,126 +1,85 @@ -use log::info; -use std::{ - cmp, - io::{Result, SeekFrom}, - pin::Pin, - task::{Context, Poll}, -}; -use systemd::journal; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; +use kubelet::log::Sender; +use std::str; +use systemd::{journal, journal::JournalRef}; -#[derive(Clone)] -pub struct JournalReader { - invocation_id: String, - cursor: Option, - buffer: Vec, -} +const MAX_LOG_LINE_LENGTH: usize = 16384; -impl JournalReader { - pub fn new(invocation_id: &str) -> JournalReader { - JournalReader { - invocation_id: String::from(invocation_id), - cursor: None, - buffer: Vec::new(), - } - } -} +pub async fn send_journal_entries(sender: &mut Sender, invocation_id: &str) -> anyhow::Result<()> { + let mut journal = journal::OpenOptions::default().open()?; + let journal = journal.match_add("_SYSTEMD_INVOCATION_ID", invocation_id)?; -impl AsyncRead for JournalReader { - fn poll_read( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - info!("poll_read"); - info!("buf remaining: [{}]", buf.remaining()); + journal.set_data_threshold(MAX_LOG_LINE_LENGTH)?; - if !self.buffer.is_empty() && buf.remaining() != 0 { - let len = cmp::min(self.buffer.len(), buf.remaining()); - let data = self.buffer.drain(0..len).collect::>(); - info!("Write buffer with length [{}] to buf", len); - buf.put_slice(&data); - } + if let Some(line_count) = sender.tail() { + journal.seek_tail()?; + journal.previous_skip(line_count as u64 + 1)?; - // TODO unexpect and unpanic code - if buf.remaining() != 0 { - info!("Write journal entries to buf"); - let mut journal = journal::OpenOptions::default() - .open() - .expect("Journal could not be opened"); - let journal = journal - .match_add("_SYSTEMD_INVOCATION_ID", self.invocation_id.clone()) - .expect("Journal could not be applied"); - - info!("Set threshold"); - // TODO Define a good default - journal - .set_data_threshold(1000) - .expect("Cannot set threshold"); + if sender.follow() { + send_remaining_messages(journal, sender).await?; + } else { + send_n_messages(journal, sender, line_count).await?; + } + } else { + send_remaining_messages(journal, sender).await?; + } - info!("Set cursor"); - match &self.cursor { - None => journal.seek_head().expect("Seek head failed"), - Some(cursor) => { - journal.seek_cursor(cursor).expect("Seek cursor failed"); - journal.next().expect("Could not get next journal entry."); - } - }; + while sender.follow() { + journal.wait(None)?; + send_remaining_messages(journal, sender).await?; + } - let mut eof = false; - while buf.remaining() != 0 && !eof { - info!("Get next journal entry"); - if journal.next().expect("Could not get next journal entry.") != 0 { - info!("Get journal data"); - match journal - .get_data("MESSAGE") - .expect("Data could not be retrieved") - { - Some(message) => { - info!("Write message [{:?}] to buf", message); - if let Some(value) = message.value() { - let mut data = value.to_vec(); - // TODO explain number - data.push(10); - let len = cmp::min(data.len(), buf.remaining()); - let data_to_buffer = data.split_off(len); - info!("Write data with length [{}] to buf", data.len()); - info!( - "Write data with length [{}] to buffer", - data_to_buffer.len() - ); - self.buffer = data_to_buffer; - buf.put_slice(&data); - } - } - None => { - info!("no message content; eof reached"); - eof = true; - } - } - } else { - info!("eof reached"); - eof = true; - } - } + Ok(()) +} - info!("Update cursor"); - self.cursor = Some(journal.cursor().expect("Cannot retrieve cursor")); - info!("New cursor: [{:?}]", self.cursor); +async fn send_n_messages( + journal: &mut JournalRef, + sender: &mut Sender, + count: usize, +) -> anyhow::Result<()> { + let mut sent = 0; + let mut message_available = true; + while sent != count && message_available { + if let Some(message) = next_message(journal)? { + send_message(sender, &message).await?; + sent += 1; + } else { + message_available = false; } - - info!("Signal poll ready"); - Poll::Ready(Ok(())) } + Ok(()) } -impl AsyncSeek for JournalReader { - fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { - info!("Seek to {:?}", position); - Ok(()) +async fn send_remaining_messages( + journal: &mut JournalRef, + sender: &mut Sender, +) -> anyhow::Result<()> { + while let Some(message) = next_message(journal)? { + send_message(sender, &message).await?; } + Ok(()) +} - fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - info!("Poll complete"); - Poll::Ready(Ok(0)) - } +fn next_message(journal: &mut JournalRef) -> anyhow::Result> { + let maybe_message = if journal.next()? != 0 { + let message = if let Some(entry) = journal.get_data("MESSAGE")? { + if let Some(value) = entry.value() { + String::from_utf8_lossy(value).into() + } else { + String::new() + } + } else { + String::new() + }; + Some(message) + } else { + None + }; + Ok(maybe_message) +} + +async fn send_message(sender: &mut Sender, message: &str) -> anyhow::Result<()> { + let mut line = message.to_owned(); + line.push('\n'); + sender.send(line).await?; + Ok(()) } diff --git a/src/provider/systemdmanager/systemdunit.rs b/src/provider/systemdmanager/systemdunit.rs index 2b671f2..0398ae7 100644 --- a/src/provider/systemdmanager/systemdunit.rs +++ b/src/provider/systemdmanager/systemdunit.rs @@ -52,6 +52,7 @@ lazy_static! { #[derive(Clone, Debug)] pub struct SystemDUnit { pub name: String, + pub container_name: Option, pub unit_type: UnitTypes, pub sections: HashMap>, } @@ -106,6 +107,8 @@ impl SystemDUnit { ) -> Result { let mut unit = common_properties.clone(); + unit.container_name = Some(String::from(container.name())); + let trimmed_name = match container .name() .strip_suffix(common_properties.get_type_string()) @@ -190,6 +193,7 @@ impl SystemDUnit { pub fn new_from_pod(pod: &Pod, user_mode: bool) -> Result { let mut unit = SystemDUnit { name: pod.name().to_string(), + container_name: None, unit_type: UnitTypes::Service, sections: Default::default(), }; From e2d76e1de5b1148434286aad261468a9f3b37e0f Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Fri, 16 Apr 2021 10:14:16 +0200 Subject: [PATCH 05/12] Channel closed error is ignored --- src/provider/mod.rs | 15 +++++++++++---- src/provider/systemdmanager/journal_reader.rs | 14 ++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 07ee25e..e502e4c 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; use kube::{Api, Client}; use kubelet::backoff::ExponentialBackoffStrategy; -use kubelet::log::Sender; +use kubelet::log::{SendError, Sender}; use kubelet::node::Builder; use kubelet::pod::state::prelude::*; use kubelet::pod::{Pod, PodKey}; @@ -215,6 +215,7 @@ impl Provider for StackableProvider { mut sender: Sender, ) -> anyhow::Result<()> { info!("Logs requested"); + let handles = self.shared.handles.write().await; let pod_handle = handles .get(&PodKey::new(&namespace, &pod)) @@ -226,13 +227,19 @@ impl Provider for StackableProvider { let invocation_id = container_handle.invocation_id.clone(); task::spawn_blocking(move || { - Runtime::new() + let result = Runtime::new() .unwrap() .block_on(journal_reader::send_journal_entries( &mut sender, &invocation_id, - )) - .unwrap(); + )); + + if let Err(error) = result { + match error.downcast_ref::() { + Some(SendError::ChannelClosed) => (), + _ => error!("Log could not be sent. {}", error), + } + } }); Ok(()) diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index 403359e..7d6e79b 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -1,10 +1,11 @@ +use anyhow::Result; use kubelet::log::Sender; use std::str; use systemd::{journal, journal::JournalRef}; const MAX_LOG_LINE_LENGTH: usize = 16384; -pub async fn send_journal_entries(sender: &mut Sender, invocation_id: &str) -> anyhow::Result<()> { +pub async fn send_journal_entries(sender: &mut Sender, invocation_id: &str) -> Result<()> { let mut journal = journal::OpenOptions::default().open()?; let journal = journal.match_add("_SYSTEMD_INVOCATION_ID", invocation_id)?; @@ -35,7 +36,7 @@ async fn send_n_messages( journal: &mut JournalRef, sender: &mut Sender, count: usize, -) -> anyhow::Result<()> { +) -> Result<()> { let mut sent = 0; let mut message_available = true; while sent != count && message_available { @@ -49,17 +50,14 @@ async fn send_n_messages( Ok(()) } -async fn send_remaining_messages( - journal: &mut JournalRef, - sender: &mut Sender, -) -> anyhow::Result<()> { +async fn send_remaining_messages(journal: &mut JournalRef, sender: &mut Sender) -> Result<()> { while let Some(message) = next_message(journal)? { send_message(sender, &message).await?; } Ok(()) } -fn next_message(journal: &mut JournalRef) -> anyhow::Result> { +fn next_message(journal: &mut JournalRef) -> Result> { let maybe_message = if journal.next()? != 0 { let message = if let Some(entry) = journal.get_data("MESSAGE")? { if let Some(value) = entry.value() { @@ -77,7 +75,7 @@ fn next_message(journal: &mut JournalRef) -> anyhow::Result> { Ok(maybe_message) } -async fn send_message(sender: &mut Sender, message: &str) -> anyhow::Result<()> { +async fn send_message(sender: &mut Sender, message: &str) -> Result<()> { let mut line = message.to_owned(); line.push('\n'); sender.send(line).await?; From b424c6348033fba4841f00881146561db493a547 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Fri, 16 Apr 2021 19:40:23 +0200 Subject: [PATCH 06/12] Iterating over systemd units replaced with container handles (wip) --- src/provider/mod.rs | 97 +++++++++++++++++--- src/provider/states/pod.rs | 2 - src/provider/states/pod/creating_service.rs | 51 ++++++----- src/provider/states/pod/running.rs | 39 ++++---- src/provider/states/pod/starting.rs | 98 ++++++++++----------- src/provider/states/pod/terminated.rs | 37 ++++---- src/provider/systemdmanager/systemdunit.rs | 4 - 7 files changed, 203 insertions(+), 125 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index e502e4c..9a45dc4 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -46,24 +46,83 @@ mod repository; mod states; mod systemdmanager; +#[derive(Clone, Debug)] pub struct ContainerHandle { - invocation_id: String, + pub service_unit: String, + pub invocation_id: Option, } -pub struct PodHandle { - containers: ContainerMap, +impl ContainerHandle { + pub fn new(service_unit: &str) -> Self { + ContainerHandle { + service_unit: String::from(service_unit), + invocation_id: None, + } + } } +type PodHandle = ContainerMap; type PodHandleMap = HashMap; /// Provider-level state shared between all pods #[derive(Clone)] pub struct ProviderState { + // TODO Change to Arc.RwLock; Compare with wasi-provider!!! handles: Arc>, client: Client, systemd_manager: Arc, } +// TODO change to impl PodHandleMap +impl ProviderState { + pub fn insert_container_handle( + &mut self, + pod_key: &PodKey, + container_key: &ContainerKey, + service_unit: &str, + ) { + self.handles + .entry(pod_key.to_owned()) + .or_insert_with(ContainerMap::new) + .insert(container_key.to_owned(), ContainerHandle::new(service_unit)); + info!("Handles inserted: {:?}", self.handles); + } + + pub fn set_invocation_id( + &mut self, + pod_key: &PodKey, + container_key: &ContainerKey, + invocation_id: &str, + ) -> anyhow::Result<()> { + if let Some(mut container_handle) = self.container_handle_mut(pod_key, container_key) { + container_handle.invocation_id = Some(String::from(invocation_id)); + Ok(()) + } else { + Err(anyhow!("Container handle not found")) + } + } + + pub fn container_handle( + &self, + pod_key: &PodKey, + container_key: &ContainerKey, + ) -> Option<&ContainerHandle> { + self.handles + .get(pod_key) + .and_then(|pod_handle| pod_handle.get(container_key)) + } + + fn container_handle_mut( + &mut self, + pod_key: &PodKey, + container_key: &ContainerKey, + ) -> Option<&mut ContainerHandle> { + self.handles + .get_mut(pod_key) + .and_then(|pod_handle| pod_handle.get_mut(container_key)) + } +} + impl StackableProvider { pub async fn new( client: Client, @@ -203,7 +262,6 @@ impl Provider for StackableProvider { service_name, service_uid, package, - service_units: None, }) } @@ -216,15 +274,28 @@ impl Provider for StackableProvider { ) -> anyhow::Result<()> { info!("Logs requested"); - let handles = self.shared.handles.write().await; - let pod_handle = handles - .get(&PodKey::new(&namespace, &pod)) - .ok_or_else(|| anyhow!("Pod [{:?}] not found", pod))?; - let container_handle = pod_handle - .containers - .get(&ContainerKey::App(container.clone())) - .ok_or_else(|| anyhow!("Container [{:?}] not found", container))?; - let invocation_id = container_handle.invocation_id.clone(); + info!("Shared state handles: {:?}", self.shared.handles); + + let pod_key = PodKey::new(&namespace, &pod); + let container_key = ContainerKey::App(container); + let container_handle = self + .shared + .container_handle(&pod_key, &container_key) + .ok_or_else(|| { + anyhow!( + "Container handle for pod [{:?}] and container [{:?}] not found", + pod_key, + container_key + ) + })?; + let invocation_id = container_handle.invocation_id.to_owned().ok_or_else(|| { + anyhow!( + "Invocation ID for container [{}] in pod [{:?}] is unknown. \ + The service is probably not started yet.", + container_key, + pod_key + ) + })?; task::spawn_blocking(move || { let result = Runtime::new() diff --git a/src/provider/states/pod.rs b/src/provider/states/pod.rs index 68eeaab..d581447 100644 --- a/src/provider/states/pod.rs +++ b/src/provider/states/pod.rs @@ -5,7 +5,6 @@ use kubelet::pod::state::prelude::*; use kubelet::pod::{Pod, Status}; use crate::provider::repository::package::Package; -use crate::provider::systemdmanager::systemdunit::SystemDUnit; use crate::provider::ProviderState; pub(crate) mod creating_config; @@ -29,7 +28,6 @@ pub struct PodState { pub service_name: String, pub service_uid: String, pub package: Package, - pub service_units: Option>, } impl PodState { diff --git a/src/provider/states/pod/creating_service.rs b/src/provider/states/pod/creating_service.rs index cb43cf4..6a53ac1 100644 --- a/src/provider/states/pod/creating_service.rs +++ b/src/provider/states/pod/creating_service.rs @@ -1,5 +1,8 @@ use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; +use kubelet::{ + container::ContainerKey, + pod::{Pod, PodKey}, +}; use log::{debug, error, info}; use super::setup_failed::SetupFailed; @@ -16,14 +19,14 @@ pub struct CreatingService; impl State for CreatingService { async fn next( self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, pod: Manifest, ) -> Transition { let pod = pod.latest(); let systemd_manager = { - let provider_state = provider_state.read().await; + let provider_state = shared.read().await; provider_state.systemd_manager.clone() }; @@ -67,27 +70,18 @@ impl State for CreatingService { // Each pod can map to multiple systemd units/services as each container will get its own // systemd unit file/service. // Map every container from the pod object to a systemdunit - let systemd_units: Vec = match pod - .containers() - .iter() - .map(|container| { - SystemDUnit::new( - &unit_template, - &service_prefix, - container, - user_mode, - pod_state, - ) - }) - .collect() - { - Ok(units) => units, - Err(err) => return Transition::Complete(Err(anyhow::Error::from(err))), - }; + for container in &pod.containers() { + let unit = match SystemDUnit::new( + &unit_template, + &service_prefix, + &container, + user_mode, + pod_state, + ) { + Ok(unit) => unit, + Err(err) => return Transition::Complete(Err(anyhow::Error::from(err))), + }; - // This will iterate over all systemd units, write the service files to disk and link - // the service to systemd. - for unit in &systemd_units { // Create the service // As per ADR005 we currently write the unit files directly in the systemd // unit directory (by passing None as [unit_file_path]). @@ -103,10 +97,19 @@ impl State for CreatingService { return Transition::Complete(Err(e)); } } + + { + let mut provider_state = shared.write().await; + provider_state.insert_container_handle( + &PodKey::from(&pod), + &ContainerKey::App(String::from(container.name())), + &unit.get_name(), + ) + }; + // Done for now, if the service was created successfully we are happy // Starting and enabling comes in a later state after all service have been createddy } - pod_state.service_units = Some(systemd_units); // All services were loaded successfully, otherwise we'd have returned early above Transition::next(self, Starting) diff --git a/src/provider/states/pod/running.rs b/src/provider/states/pod/running.rs index 82e9b46..19c9c30 100644 --- a/src/provider/states/pod/running.rs +++ b/src/provider/states/pod/running.rs @@ -4,13 +4,13 @@ use k8s_openapi::api::core::v1::{ }; use krator::ObjectStatus; use kubelet::pod::state::prelude::*; -use kubelet::pod::Pod; +use kubelet::pod::{Pod, PodKey}; use log::{debug, info, trace}; use super::failed::Failed; use super::installing::Installing; use crate::provider::states::make_status_with_containers_and_condition; -use crate::provider::{PodState, ProviderState}; +use crate::provider::{PodHandle, PodState, ProviderState}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; use k8s_openapi::chrono; use tokio::time::Duration; @@ -33,13 +33,22 @@ impl Default for Running { impl State for Running { async fn next( mut self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, - _pod: Manifest, + pod: Manifest, ) -> Transition { - let systemd_manager = { - let provider_state = provider_state.read().await; - provider_state.systemd_manager.clone() + let pod = pod.latest(); + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { + let provider_state = shared.read().await; + ( + provider_state.systemd_manager.clone(), + provider_state + .handles + .get(&pod_key) + .map(PodHandle::to_owned), + ) }; // We loop here indefinitely and "wake up" periodically to check if the service is still @@ -57,20 +66,20 @@ impl State for Running { // Iterate over all units and check their state // if the [`service_units`] Option is a None variant, return a failed state // as we need to run something otherwise we are not doing anything - let systemd_units = match &pod_state.service_units { - Some(units) => units, + let containers = match &pod_handle { + Some(containers) => containers, None => return Transition::Complete(Err(anyhow!(format!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name)))), }; - for unit in systemd_units { - match systemd_manager.is_running(&unit.get_name()) { + for container_handle in containers.values() { + match systemd_manager.is_running(&container_handle.service_unit) { Ok(true) => trace!( "Unit [{}] of service [{}] still running ...", - &unit.get_name(), + &container_handle.service_unit, pod_state.service_name ), Ok(false) => { - info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, unit.get_name()); + info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, container_handle.service_unit); return Transition::next( self, Failed { @@ -81,9 +90,7 @@ impl State for Running { Err(dbus_error) => { info!( "Error querying ActiveState for Unit [{}] of service [{}]: [{}].", - pod_state.service_name, - unit.get_name(), - dbus_error + pod_state.service_name, container_handle.service_unit, dbus_error ); return Transition::Complete(Err(dbus_error)); } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index ff2edf1..2f93b43 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -1,16 +1,13 @@ use kubelet::pod::state::prelude::*; -use kubelet::{ - container::ContainerKey, - pod::{Pod, PodKey}, -}; +use kubelet::pod::{Pod, PodKey}; use super::failed::Failed; use super::running::Running; use super::setup_failed::SetupFailed; -use crate::provider::{ContainerHandle, PodHandle, PodState, ProviderState}; +use crate::provider::{PodHandle, PodState, ProviderState}; use anyhow::anyhow; use log::{debug, error, info, warn}; -use std::{collections::HashMap, time::Instant}; +use std::time::Instant; use tokio::time::Duration; #[derive(Default, Debug, TransitionTo)] @@ -25,19 +22,27 @@ impl State for Starting { pod_state: &mut PodState, pod: Manifest, ) -> Transition { - let systemd_manager = { + let pod = pod.latest(); + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { let provider_state = shared.read().await; - provider_state.systemd_manager.clone() + ( + provider_state.systemd_manager.clone(), + provider_state + .handles + .get(&pod_key) + .map(PodHandle::to_owned), + ) }; - if let Some(systemd_units) = &pod_state.service_units { - for unit in systemd_units { - match systemd_manager.is_running(&unit.get_name()) { + if let Some(containers) = pod_handle { + for (container_key, container_handle) in containers { + match systemd_manager.is_running(&container_handle.service_unit) { Ok(true) => { debug!( "Unit [{}] for service [{}] already running, nothing to do..", - &unit.get_name(), - &pod_state.service_name + &container_handle.service_unit, &pod_state.service_name ); // Skip rest of loop as the service is already running continue; @@ -45,31 +50,27 @@ impl State for Starting { Err(dbus_error) => { debug!( "Error retrieving activestate of unit [{}] for service [{}]: [{}]", - &unit.get_name(), - &pod_state.service_name, - dbus_error + &container_handle.service_unit, &pod_state.service_name, dbus_error ); return Transition::Complete(Err(dbus_error)); } _ => { // nothing to do, just keep going } } - info!("Starting systemd unit [{}]", unit); - if let Err(start_error) = systemd_manager.start(&unit.get_name()) { + info!("Starting systemd unit [{}]", container_handle.service_unit); + if let Err(start_error) = systemd_manager.start(&container_handle.service_unit) { error!( "Error occurred starting systemd unit [{}]: [{}]", - unit.get_name(), - start_error + container_handle.service_unit, start_error ); return Transition::Complete(Err(start_error)); } - info!("Enabling systemd unit [{}]", unit); - if let Err(enable_error) = systemd_manager.enable(&unit.get_name()) { + info!("Enabling systemd unit [{}]", container_handle.service_unit); + if let Err(enable_error) = systemd_manager.enable(&container_handle.service_unit) { error!( "Error occurred starting systemd unit [{}]: [{}]", - unit.get_name(), - enable_error + container_handle.service_unit, enable_error ); return Transition::Complete(Err(enable_error)); } @@ -86,48 +87,43 @@ impl State for Starting { tokio::time::sleep(Duration::from_secs(1)).await; debug!( "Checking if unit [{}] is still up and running.", - &unit.get_name() + container_handle.service_unit ); - match systemd_manager.is_running(&unit.get_name()) { + match systemd_manager.is_running(&container_handle.service_unit) { Ok(true) => debug!( "Service [{}] still running after [{}] seconds", - &unit.get_name(), + &container_handle.service_unit, start_time.elapsed().as_secs() ), Ok(false) => { - return Transition::Complete(Err(anyhow!(format!( + return Transition::Complete(Err(anyhow!( "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", - &unit.get_name(), + &container_handle.service_unit, start_time.elapsed().as_secs() - )))) + ))) } Err(dbus_error) => return Transition::Complete(Err(dbus_error)), } } info!("Creating container handle"); - { - let pod = pod.latest(); - let provider_state = shared.write().await; - let pod_key = PodKey::from(pod.clone()); - info!("Pod [{:?}] inserted into handles", pod_key); - let mut handles_writer = provider_state.handles.write().await; - let pod_handle = handles_writer.entry(pod_key).or_insert_with(|| PodHandle { - containers: HashMap::new(), - }); - match systemd_manager.get_invocation_id(&unit.get_name()) { - Ok(invocation_id) => { - pod_handle.containers.insert( - ContainerKey::App( - unit.container_name - .as_ref() - .expect("Container name is missing") - .clone(), - ), - ContainerHandle { invocation_id }, - ); - } + let invocation_id = + match systemd_manager.get_invocation_id(&container_handle.service_unit) { + Ok(invocation_id) => invocation_id, Err(dbus_error) => return Transition::Complete(Err(dbus_error)), + }; + + { + let mut provider_state = shared.write().await; + if provider_state + .set_invocation_id(&pod_key, &container_key, &invocation_id) + .is_err() + { + return Transition::Complete(Err(anyhow!( + "Container [{}] in pod [{:?}] not found", + container_key, + pod_key + ))); } } } diff --git a/src/provider/states/pod/terminated.rs b/src/provider/states/pod/terminated.rs index 5742bef..db08a29 100644 --- a/src/provider/states/pod/terminated.rs +++ b/src/provider/states/pod/terminated.rs @@ -1,4 +1,4 @@ -use kubelet::pod::state::prelude::*; +use kubelet::pod::{state::prelude::*, PodKey}; use log::{error, info, warn}; use crate::provider::{PodState, ProviderState}; @@ -13,40 +13,47 @@ pub struct Terminated { impl State for Terminated { async fn next( self: Box, - provider_state: SharedState, + shared: SharedState, pod_state: &mut PodState, - _pod: Manifest, + pod: Manifest, ) -> Transition { info!( "Pod {} was terminated, stopping service!", &pod_state.service_name ); - let systemd_manager = { - let provider_state = provider_state.read().await; - provider_state.systemd_manager.clone() + let pod = pod.latest(); + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { + let mut provider_state = shared.write().await; + ( + provider_state.systemd_manager.clone(), + provider_state.handles.remove(&pod_key), + ) }; // TODO: We need some additional error handling here, wait for the services to actually // shut down and try to remove the rest of the services if one fails (tbd, do we want that?) - if let Some(systemd_units) = &pod_state.service_units { - for unit in systemd_units { - info!("Stopping systemd unit [{}]", unit); - if let Err(stop_error) = systemd_manager.stop(&unit.get_name()) { + if let Some(containers) = pod_handle { + for container_handle in containers.values() { + info!("Stopping systemd unit [{}]", container_handle.service_unit); + if let Err(stop_error) = systemd_manager.stop(&container_handle.service_unit) { error!( "Error occurred stopping systemd unit [{}]: [{}]", - unit.get_name(), - stop_error + container_handle.service_unit, stop_error ); return Transition::Complete(Err(stop_error)); } // Daemon reload is false here, we'll do that once after all units have been removed - info!("Removing systemd unit [{}]", &unit); - if let Err(remove_error) = systemd_manager.remove_unit(&unit.get_name(), false) { + info!("Removing systemd unit [{}]", container_handle.service_unit); + if let Err(remove_error) = + systemd_manager.remove_unit(&container_handle.service_unit, false) + { error!( "Error occurred removing systemd unit [{}]: [{}]", - unit, remove_error + container_handle.service_unit, remove_error ); return Transition::Complete(Err(remove_error)); } diff --git a/src/provider/systemdmanager/systemdunit.rs b/src/provider/systemdmanager/systemdunit.rs index 0398ae7..2b671f2 100644 --- a/src/provider/systemdmanager/systemdunit.rs +++ b/src/provider/systemdmanager/systemdunit.rs @@ -52,7 +52,6 @@ lazy_static! { #[derive(Clone, Debug)] pub struct SystemDUnit { pub name: String, - pub container_name: Option, pub unit_type: UnitTypes, pub sections: HashMap>, } @@ -107,8 +106,6 @@ impl SystemDUnit { ) -> Result { let mut unit = common_properties.clone(); - unit.container_name = Some(String::from(container.name())); - let trimmed_name = match container .name() .strip_suffix(common_properties.get_type_string()) @@ -193,7 +190,6 @@ impl SystemDUnit { pub fn new_from_pod(pod: &Pod, user_mode: bool) -> Result { let mut unit = SystemDUnit { name: pod.name().to_string(), - container_name: None, unit_type: UnitTypes::Service, sections: Default::default(), }; From 8e45e50333381a200e551694e29a19189d4afbbc Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 19 Apr 2021 10:02:17 +0200 Subject: [PATCH 07/12] RwLocks fixed --- src/provider/mod.rs | 23 +++++++++++++++------ src/provider/states/pod/creating_service.rs | 5 +++-- src/provider/states/pod/running.rs | 6 ++---- src/provider/states/pod/starting.rs | 11 +++++----- src/provider/states/pod/terminated.rs | 5 +++-- 5 files changed, 30 insertions(+), 20 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 9a45dc4..f3f5d79 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -62,19 +62,29 @@ impl ContainerHandle { } type PodHandle = ContainerMap; -type PodHandleMap = HashMap; /// Provider-level state shared between all pods #[derive(Clone)] pub struct ProviderState { - // TODO Change to Arc.RwLock; Compare with wasi-provider!!! handles: Arc>, client: Client, systemd_manager: Arc, } -// TODO change to impl PodHandleMap -impl ProviderState { +#[derive(Debug, Default)] +struct PodHandleMap { + handles: HashMap, +} + +impl PodHandleMap { + pub fn get(&self, pod_key: &PodKey) -> Option<&PodHandle> { + self.handles.get(pod_key) + } + + pub fn remove(&mut self, pod_key: &PodKey) -> Option { + self.handles.remove(pod_key) + } + pub fn insert_container_handle( &mut self, pod_key: &PodKey, @@ -276,10 +286,11 @@ impl Provider for StackableProvider { info!("Shared state handles: {:?}", self.shared.handles); + let handles = self.shared.handles.read().await; + let pod_key = PodKey::new(&namespace, &pod); let container_key = ContainerKey::App(container); - let container_handle = self - .shared + let container_handle = handles .container_handle(&pod_key, &container_key) .ok_or_else(|| { anyhow!( diff --git a/src/provider/states/pod/creating_service.rs b/src/provider/states/pod/creating_service.rs index 6a53ac1..4179c36 100644 --- a/src/provider/states/pod/creating_service.rs +++ b/src/provider/states/pod/creating_service.rs @@ -99,8 +99,9 @@ impl State for CreatingService { } { - let mut provider_state = shared.write().await; - provider_state.insert_container_handle( + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; + handles.insert_container_handle( &PodKey::from(&pod), &ContainerKey::App(String::from(container.name())), &unit.get_name(), diff --git a/src/provider/states/pod/running.rs b/src/provider/states/pod/running.rs index 19c9c30..dee957d 100644 --- a/src/provider/states/pod/running.rs +++ b/src/provider/states/pod/running.rs @@ -42,12 +42,10 @@ impl State for Running { let (systemd_manager, pod_handle) = { let provider_state = shared.read().await; + let handles = provider_state.handles.read().await; ( provider_state.systemd_manager.clone(), - provider_state - .handles - .get(&pod_key) - .map(PodHandle::to_owned), + handles.get(&pod_key).map(PodHandle::to_owned), ) }; diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 2f93b43..a13d2ba 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -27,12 +27,10 @@ impl State for Starting { let (systemd_manager, pod_handle) = { let provider_state = shared.read().await; + let handles = provider_state.handles.read().await; ( provider_state.systemd_manager.clone(), - provider_state - .handles - .get(&pod_key) - .map(PodHandle::to_owned), + handles.get(&pod_key).map(PodHandle::to_owned), ) }; @@ -114,8 +112,9 @@ impl State for Starting { }; { - let mut provider_state = shared.write().await; - if provider_state + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; + if handles .set_invocation_id(&pod_key, &container_key, &invocation_id) .is_err() { diff --git a/src/provider/states/pod/terminated.rs b/src/provider/states/pod/terminated.rs index db08a29..9557ef0 100644 --- a/src/provider/states/pod/terminated.rs +++ b/src/provider/states/pod/terminated.rs @@ -26,10 +26,11 @@ impl State for Terminated { let pod_key = &PodKey::from(pod); let (systemd_manager, pod_handle) = { - let mut provider_state = shared.write().await; + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; ( provider_state.systemd_manager.clone(), - provider_state.handles.remove(&pod_key), + handles.remove(&pod_key), ) }; From beb2184db56ab564502a52c738fe2873a7233b1a Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 19 Apr 2021 13:57:38 +0200 Subject: [PATCH 08/12] Invocation IDs are entered for already started service units --- src/provider/mod.rs | 7 +- src/provider/states/pod/starting.rs | 234 ++++++++++++------------- src/provider/systemdmanager/manager.rs | 7 + 3 files changed, 128 insertions(+), 120 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index f3f5d79..74fc38b 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -108,7 +108,12 @@ impl PodHandleMap { container_handle.invocation_id = Some(String::from(invocation_id)); Ok(()) } else { - Err(anyhow!("Container handle not found")) + Err(anyhow!( + "Invocation ID could not be stored. Container handle for + pod [{:?}] and container [{}] not found", + pod_key, + container_key + )) } } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index a13d2ba..4f14902 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -1,14 +1,19 @@ -use kubelet::pod::state::prelude::*; -use kubelet::pod::{Pod, PodKey}; - use super::failed::Failed; use super::running::Running; use super::setup_failed::SetupFailed; -use crate::provider::{PodHandle, PodState, ProviderState}; +use crate::provider::{ + systemdmanager::manager::SystemdManager, ContainerHandle, PodHandle, PodState, ProviderState, +}; + use anyhow::anyhow; -use log::{debug, error, info, warn}; +use kubelet::pod::state::prelude::*; +use kubelet::{ + container::ContainerKey, + pod::{Pod, PodKey}, +}; +use log::{debug, error, info}; use std::time::Instant; -use tokio::time::Duration; +use tokio::time::{self, Duration}; #[derive(Default, Debug, TransitionTo)] #[transition_to(Running, Failed, SetupFailed)] @@ -23,124 +28,115 @@ impl State for Starting { pod: Manifest, ) -> Transition { let pod = pod.latest(); - let pod_key = &PodKey::from(pod); - - let (systemd_manager, pod_handle) = { - let provider_state = shared.read().await; - let handles = provider_state.handles.read().await; - ( - provider_state.systemd_manager.clone(), - handles.get(&pod_key).map(PodHandle::to_owned), - ) - }; - - if let Some(containers) = pod_handle { - for (container_key, container_handle) in containers { - match systemd_manager.is_running(&container_handle.service_unit) { - Ok(true) => { - debug!( - "Unit [{}] for service [{}] already running, nothing to do..", - &container_handle.service_unit, &pod_state.service_name - ); - // Skip rest of loop as the service is already running - continue; - } - Err(dbus_error) => { - debug!( - "Error retrieving activestate of unit [{}] for service [{}]: [{}]", - &container_handle.service_unit, &pod_state.service_name, dbus_error - ); - return Transition::Complete(Err(dbus_error)); - } - _ => { // nothing to do, just keep going - } - } - info!("Starting systemd unit [{}]", container_handle.service_unit); - if let Err(start_error) = systemd_manager.start(&container_handle.service_unit) { - error!( - "Error occurred starting systemd unit [{}]: [{}]", - container_handle.service_unit, start_error - ); - return Transition::Complete(Err(start_error)); - } - - info!("Enabling systemd unit [{}]", container_handle.service_unit); - if let Err(enable_error) = systemd_manager.enable(&container_handle.service_unit) { - error!( - "Error occurred starting systemd unit [{}]: [{}]", - container_handle.service_unit, enable_error - ); - return Transition::Complete(Err(enable_error)); - } - - let start_time = Instant::now(); - // TODO: does this need to be configurable, or ar we happy with a hard coded value - // for now. I've briefly looked at the podspec and couldn't identify a good field - // to use for this - also, currently this starts containers (= systemd units) in - // order and waits 10 seconds for every unit, so a service with five containers - // would take 50 seconds until it reported running - which is totally fine in case - // the units actually depend on each other, but a case could be made for waiting - // once at the end - while start_time.elapsed().as_secs() < 10 { - tokio::time::sleep(Duration::from_secs(1)).await; - debug!( - "Checking if unit [{}] is still up and running.", - container_handle.service_unit - ); - match systemd_manager.is_running(&container_handle.service_unit) { - Ok(true) => debug!( - "Service [{}] still running after [{}] seconds", - &container_handle.service_unit, - start_time.elapsed().as_secs() - ), - Ok(false) => { - return Transition::Complete(Err(anyhow!( - "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", - &container_handle.service_unit, - start_time.elapsed().as_secs() - ))) - } - Err(dbus_error) => return Transition::Complete(Err(dbus_error)), - } - } - - info!("Creating container handle"); - let invocation_id = - match systemd_manager.get_invocation_id(&container_handle.service_unit) { - Ok(invocation_id) => invocation_id, - Err(dbus_error) => return Transition::Complete(Err(dbus_error)), - }; - - { - let provider_state = shared.write().await; - let mut handles = provider_state.handles.write().await; - if handles - .set_invocation_id(&pod_key, &container_key, &invocation_id) - .is_err() - { - return Transition::Complete(Err(anyhow!( - "Container [{}] in pod [{:?}] not found", - container_key, - pod_key - ))); - } - } + + match start_containers(shared, pod_state, &pod).await { + Ok(()) => Transition::next( + self, + Running { + ..Default::default() + }, + ), + Err(error) => { + error!("{}", error); + Transition::Complete(Err(error)) } - } else { - warn!( - "No unit definitions found, not starting anything for pod [{}]!", - pod_state.service_name - ); } - Transition::next( - self, - Running { - ..Default::default() - }, - ) } async fn status(&self, _pod_state: &mut PodState, _pod: &Pod) -> anyhow::Result { Ok(make_status(Phase::Pending, &"Starting")) } } + +async fn start_containers( + shared: SharedState, + pod_state: &PodState, + pod: &Pod, +) -> anyhow::Result<()> { + let pod_key = &PodKey::from(pod); + + let (systemd_manager, pod_handle) = { + let provider_state = shared.read().await; + let handles = provider_state.handles.read().await; + ( + provider_state.systemd_manager.clone(), + handles.get(&pod_key).map(PodHandle::to_owned), + ) + }; + + for (container_key, container_handle) in pod_handle.unwrap_or_default() { + if systemd_manager.is_running(&container_handle.service_unit)? { + debug!( + "Unit [{}] for service [{}] already running, nothing to do..", + &container_handle.service_unit, &pod_state.service_name + ); + } else { + info!("Starting systemd unit [{}]", container_handle.service_unit); + systemd_manager.start(&container_handle.service_unit)?; + + info!("Enabling systemd unit [{}]", container_handle.service_unit); + systemd_manager.enable(&container_handle.service_unit)?; + + await_startup(&systemd_manager, &container_handle).await?; + } + + let invocation_id = systemd_manager.get_invocation_id(&container_handle.service_unit)?; + enter_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?; + } + + Ok(()) +} + +async fn await_startup( + systemd_manager: &SystemdManager, + container_handle: &ContainerHandle, +) -> anyhow::Result<()> { + let start_time = Instant::now(); + // TODO: does this need to be configurable, or ar we happy with a hard coded value + // for now. I've briefly looked at the podspec and couldn't identify a good field + // to use for this - also, currently this starts containers (= systemd units) in + // order and waits 10 seconds for every unit, so a service with five containers + // would take 50 seconds until it reported running - which is totally fine in case + // the units actually depend on each other, but a case could be made for waiting + // once at the end + while start_time.elapsed().as_secs() < 10 { + time::sleep(Duration::from_secs(1)).await; + + debug!( + "Checking if unit [{}] is still up and running.", + container_handle.service_unit + ); + + if systemd_manager.is_running(&container_handle.service_unit)? { + debug!( + "Service [{}] still running after [{}] seconds", + &container_handle.service_unit, + start_time.elapsed().as_secs() + ); + } else { + return Err(anyhow!( + "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", + &container_handle.service_unit, + start_time.elapsed().as_secs() + )); + } + } + + Ok(()) +} + +async fn enter_invocation_id( + shared: SharedState, + pod_key: &PodKey, + container_key: &ContainerKey, + invocation_id: &str, +) -> anyhow::Result<()> { + debug!( + "Set invocation ID [{}] for pod [{:?}] and container [{}].", + invocation_id, pod_key, container_key + ); + + let provider_state = shared.write().await; + let mut handles = provider_state.handles.write().await; + handles.set_invocation_id(&pod_key, &container_key, &invocation_id) +} diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index ba08ec2..a8cb255 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -376,6 +376,13 @@ impl SystemdManager { pub fn is_running(&self, unit: &str) -> Result { self.get_value::(unit, "ActiveState") .map(|v| v.as_str() == Some("active")) + .map_err(|dbus_error| { + anyhow!( + "Error receiving ActiveState of unit [{}]. {}", + unit, + dbus_error + ) + }) } pub fn get_invocation_id(&self, unit: &str) -> Result { From b646345129519dc5f308a92bbd03eadf2130d255 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 20 Apr 2021 10:47:14 +0200 Subject: [PATCH 09/12] Code commented and cleaned up --- src/provider/mod.rs | 102 +++++++++++------- src/provider/states/pod/creating_service.rs | 11 +- src/provider/states/pod/running.rs | 12 ++- src/provider/states/pod/starting.rs | 93 ++++++++-------- src/provider/states/pod/terminated.rs | 16 +-- src/provider/systemdmanager/journal_reader.rs | 29 ++++- src/provider/systemdmanager/manager.rs | 15 ++- 7 files changed, 165 insertions(+), 113 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 74fc38b..757e569 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -15,7 +15,7 @@ use kubelet::{ container::{ContainerKey, ContainerMap}, provider::Provider, }; -use log::{debug, error, info}; +use log::{debug, error}; use tokio::{runtime::Runtime, sync::RwLock, task}; use crate::provider::error::StackableError; @@ -46,23 +46,6 @@ mod repository; mod states; mod systemdmanager; -#[derive(Clone, Debug)] -pub struct ContainerHandle { - pub service_unit: String, - pub invocation_id: Option, -} - -impl ContainerHandle { - pub fn new(service_unit: &str) -> Self { - ContainerHandle { - service_unit: String::from(service_unit), - invocation_id: None, - } - } -} - -type PodHandle = ContainerMap; - /// Provider-level state shared between all pods #[derive(Clone)] pub struct ProviderState { @@ -71,33 +54,42 @@ pub struct ProviderState { systemd_manager: Arc, } +/// Contains handles for running pods. #[derive(Debug, Default)] struct PodHandleMap { handles: HashMap, } impl PodHandleMap { + /// Returns the pod handle for the given key or [`None`] if not found. pub fn get(&self, pod_key: &PodKey) -> Option<&PodHandle> { self.handles.get(pod_key) } + /// Removes the pod handle with the given key and returns it. pub fn remove(&mut self, pod_key: &PodKey) -> Option { self.handles.remove(pod_key) } + /// Inserts a new [`ContainerHandle`] for the given pod and container key. + /// + /// A pod handle is created if not already existent. pub fn insert_container_handle( &mut self, pod_key: &PodKey, container_key: &ContainerKey, - service_unit: &str, + container_handle: &ContainerHandle, ) { self.handles .entry(pod_key.to_owned()) .or_insert_with(ContainerMap::new) - .insert(container_key.to_owned(), ContainerHandle::new(service_unit)); - info!("Handles inserted: {:?}", self.handles); + .insert(container_key.to_owned(), container_handle.to_owned()); } + /// Sets the invocation ID for the given pod and container key. + /// + /// If there is no corresponding container handle then an error is + /// returned. pub fn set_invocation_id( &mut self, pod_key: &PodKey, @@ -117,6 +109,8 @@ impl PodHandleMap { } } + /// Returns a reference to the container handle with the given pod and + /// container key or [`None`] if not found. pub fn container_handle( &self, pod_key: &PodKey, @@ -127,6 +121,8 @@ impl PodHandleMap { .and_then(|pod_handle| pod_handle.get(container_key)) } + /// Returns a mutable reference to the container handle with the given + /// pod and container key or [`None`] if not found. fn container_handle_mut( &mut self, pod_key: &PodKey, @@ -138,6 +134,31 @@ impl PodHandleMap { } } +/// Represents a handle to a running pod. +type PodHandle = ContainerMap; + +/// Represents a handle to a running container. +#[derive(Clone, Debug)] +pub struct ContainerHandle { + /// Contains the name of the corresponding service unit. + /// Can be used as reference in [`crate::provider::systemdmanager::manager`]. + pub service_unit: String, + + /// Contains the systemd invocation ID which identifies the + /// corresponding entries in the journal. + pub invocation_id: Option, +} + +impl ContainerHandle { + /// Creates an instance with the given service unit name. + pub fn new(service_unit: &str) -> Self { + ContainerHandle { + service_unit: String::from(service_unit), + invocation_id: None, + } + } +} + impl StackableProvider { pub async fn new( client: Client, @@ -287,27 +308,29 @@ impl Provider for StackableProvider { container: String, mut sender: Sender, ) -> anyhow::Result<()> { - info!("Logs requested"); - - info!("Shared state handles: {:?}", self.shared.handles); - - let handles = self.shared.handles.read().await; + debug!("Logs requested"); let pod_key = PodKey::new(&namespace, &pod); let container_key = ContainerKey::App(container); - let container_handle = handles - .container_handle(&pod_key, &container_key) - .ok_or_else(|| { - anyhow!( - "Container handle for pod [{:?}] and container [{:?}] not found", - pod_key, - container_key - ) - })?; - let invocation_id = container_handle.invocation_id.to_owned().ok_or_else(|| { + + let maybe_container_handle = { + let handles = self.shared.handles.read().await; + handles + .container_handle(&pod_key, &container_key) + .map(ContainerHandle::to_owned) + }; + + let container_handle = maybe_container_handle.ok_or_else(|| { + anyhow!( + "Container handle for pod [{:?}] and container [{:?}] not found", + pod_key, + container_key + ) + })?; + let invocation_id = container_handle.invocation_id.ok_or_else(|| { anyhow!( "Invocation ID for container [{}] in pod [{:?}] is unknown. \ - The service is probably not started yet.", + The service is probably not started yet.", container_key, pod_key ) @@ -316,10 +339,7 @@ impl Provider for StackableProvider { task::spawn_blocking(move || { let result = Runtime::new() .unwrap() - .block_on(journal_reader::send_journal_entries( - &mut sender, - &invocation_id, - )); + .block_on(journal_reader::send_messages(&mut sender, &invocation_id)); if let Err(error) = result { match error.downcast_ref::() { diff --git a/src/provider/states/pod/creating_service.rs b/src/provider/states/pod/creating_service.rs index 4179c36..da887b0 100644 --- a/src/provider/states/pod/creating_service.rs +++ b/src/provider/states/pod/creating_service.rs @@ -8,7 +8,8 @@ use log::{debug, error, info}; use super::setup_failed::SetupFailed; use super::starting::Starting; use crate::provider::systemdmanager::systemdunit::SystemDUnit; -use crate::provider::{PodState, ProviderState}; +use crate::provider::{ContainerHandle, PodState, ProviderState}; +use anyhow::Error; use std::fs::create_dir_all; #[derive(Default, Debug, TransitionTo)] @@ -42,7 +43,7 @@ impl State for CreatingService { pod_state.service_name, service_directory ); if let Err(error) = create_dir_all(service_directory) { - return Transition::Complete(Err(anyhow::Error::from(error))); + return Transition::Complete(Err(Error::from(error))); } } @@ -63,7 +64,7 @@ impl State for CreatingService { "Unable to create systemd unit template from pod [{}]: [{}]", service_name, pod_error ); - return Transition::Complete(Err(anyhow::Error::from(pod_error))); + return Transition::Complete(Err(Error::from(pod_error))); } }; @@ -79,7 +80,7 @@ impl State for CreatingService { pod_state, ) { Ok(unit) => unit, - Err(err) => return Transition::Complete(Err(anyhow::Error::from(err))), + Err(err) => return Transition::Complete(Err(Error::from(err))), }; // Create the service @@ -104,7 +105,7 @@ impl State for CreatingService { handles.insert_container_handle( &PodKey::from(&pod), &ContainerKey::App(String::from(container.name())), - &unit.get_name(), + &ContainerHandle::new(&unit.get_name()), ) }; diff --git a/src/provider/states/pod/running.rs b/src/provider/states/pod/running.rs index dee957d..e32fe84 100644 --- a/src/provider/states/pod/running.rs +++ b/src/provider/states/pod/running.rs @@ -66,18 +66,20 @@ impl State for Running { // as we need to run something otherwise we are not doing anything let containers = match &pod_handle { Some(containers) => containers, - None => return Transition::Complete(Err(anyhow!(format!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name)))), + None => return Transition::Complete(Err(anyhow!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name))), }; for container_handle in containers.values() { - match systemd_manager.is_running(&container_handle.service_unit) { + let service_unit = &container_handle.service_unit; + + match systemd_manager.is_running(&service_unit) { Ok(true) => trace!( "Unit [{}] of service [{}] still running ...", - &container_handle.service_unit, + service_unit, pod_state.service_name ), Ok(false) => { - info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, container_handle.service_unit); + info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, service_unit); return Transition::next( self, Failed { @@ -88,7 +90,7 @@ impl State for Running { Err(dbus_error) => { info!( "Error querying ActiveState for Unit [{}] of service [{}]: [{}].", - pod_state.service_name, container_handle.service_unit, dbus_error + pod_state.service_name, service_unit, dbus_error ); return Transition::Complete(Err(dbus_error)); } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 4f14902..8a6dad2 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -1,11 +1,9 @@ -use super::failed::Failed; use super::running::Running; -use super::setup_failed::SetupFailed; use crate::provider::{ - systemdmanager::manager::SystemdManager, ContainerHandle, PodHandle, PodState, ProviderState, + systemdmanager::manager::SystemdManager, PodHandle, PodState, ProviderState, }; -use anyhow::anyhow; +use anyhow::{anyhow, Result}; use kubelet::pod::state::prelude::*; use kubelet::{ container::ContainerKey, @@ -16,7 +14,7 @@ use std::time::Instant; use tokio::time::{self, Duration}; #[derive(Default, Debug, TransitionTo)] -#[transition_to(Running, Failed, SetupFailed)] +#[transition_to(Running)] pub struct Starting; #[async_trait::async_trait] @@ -29,13 +27,8 @@ impl State for Starting { ) -> Transition { let pod = pod.latest(); - match start_containers(shared, pod_state, &pod).await { - Ok(()) => Transition::next( - self, - Running { - ..Default::default() - }, - ), + match start_service_units(shared, pod_state, &pod).await { + Ok(()) => Transition::next(self, Running::default()), Err(error) => { error!("{}", error); Transition::Complete(Err(error)) @@ -43,16 +36,21 @@ impl State for Starting { } } - async fn status(&self, _pod_state: &mut PodState, _pod: &Pod) -> anyhow::Result { - Ok(make_status(Phase::Pending, &"Starting")) + async fn status(&self, _pod_state: &mut PodState, _pod: &Pod) -> Result { + Ok(make_status(Phase::Pending, "Starting")) } } -async fn start_containers( +/// Starts the service units for the containers of the given pod. +/// +/// The units are started and enabled if they are not already running. +/// The startup is considered successful if the unit is still running +/// after 10 seconds. +async fn start_service_units( shared: SharedState, pod_state: &PodState, pod: &Pod, -) -> anyhow::Result<()> { +) -> Result<()> { let pod_key = &PodKey::from(pod); let (systemd_manager, pod_handle) = { @@ -65,58 +63,62 @@ async fn start_containers( }; for (container_key, container_handle) in pod_handle.unwrap_or_default() { - if systemd_manager.is_running(&container_handle.service_unit)? { + let service_unit = &container_handle.service_unit; + + if systemd_manager.is_running(service_unit)? { debug!( - "Unit [{}] for service [{}] already running, nothing to do..", - &container_handle.service_unit, &pod_state.service_name + "Unit [{}] for service [{}] is already running. Skip startup.", + service_unit, &pod_state.service_name ); } else { - info!("Starting systemd unit [{}]", container_handle.service_unit); - systemd_manager.start(&container_handle.service_unit)?; - - info!("Enabling systemd unit [{}]", container_handle.service_unit); - systemd_manager.enable(&container_handle.service_unit)?; - - await_startup(&systemd_manager, &container_handle).await?; + info!("Starting systemd unit [{}]", service_unit); + systemd_manager.start(service_unit)?; + + info!("Enabling systemd unit [{}]", service_unit); + systemd_manager.enable(service_unit)?; + + // TODO: does this need to be configurable, or ar we happy with a hard coded value + // for now. I've briefly looked at the podspec and couldn't identify a good field + // to use for this - also, currently this starts containers (= systemd units) in + // order and waits 10 seconds for every unit, so a service with five containers + // would take 50 seconds until it reported running - which is totally fine in case + // the units actually depend on each other, but a case could be made for waiting + // once at the end + await_startup(&systemd_manager, service_unit, Duration::from_secs(10)).await?; } - let invocation_id = systemd_manager.get_invocation_id(&container_handle.service_unit)?; - enter_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?; + let invocation_id = systemd_manager.get_invocation_id(service_unit)?; + store_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?; } Ok(()) } +/// Checks if the given service unit is still running after the given duration. async fn await_startup( systemd_manager: &SystemdManager, - container_handle: &ContainerHandle, -) -> anyhow::Result<()> { + service_unit: &str, + duration: Duration, +) -> Result<()> { let start_time = Instant::now(); - // TODO: does this need to be configurable, or ar we happy with a hard coded value - // for now. I've briefly looked at the podspec and couldn't identify a good field - // to use for this - also, currently this starts containers (= systemd units) in - // order and waits 10 seconds for every unit, so a service with five containers - // would take 50 seconds until it reported running - which is totally fine in case - // the units actually depend on each other, but a case could be made for waiting - // once at the end - while start_time.elapsed().as_secs() < 10 { + while start_time.elapsed() < duration { time::sleep(Duration::from_secs(1)).await; debug!( "Checking if unit [{}] is still up and running.", - container_handle.service_unit + service_unit ); - if systemd_manager.is_running(&container_handle.service_unit)? { + if systemd_manager.is_running(service_unit)? { debug!( "Service [{}] still running after [{}] seconds", - &container_handle.service_unit, + service_unit, start_time.elapsed().as_secs() ); } else { return Err(anyhow!( "Unit [{}] stopped unexpectedly during startup after [{}] seconds.", - &container_handle.service_unit, + service_unit, start_time.elapsed().as_secs() )); } @@ -125,12 +127,13 @@ async fn await_startup( Ok(()) } -async fn enter_invocation_id( +/// Stores the given invocation ID into the corresponding container handle. +async fn store_invocation_id( shared: SharedState, pod_key: &PodKey, container_key: &ContainerKey, invocation_id: &str, -) -> anyhow::Result<()> { +) -> Result<()> { debug!( "Set invocation ID [{}] for pod [{:?}] and container [{}].", invocation_id, pod_key, container_key @@ -138,5 +141,5 @@ async fn enter_invocation_id( let provider_state = shared.write().await; let mut handles = provider_state.handles.write().await; - handles.set_invocation_id(&pod_key, &container_key, &invocation_id) + handles.set_invocation_id(&pod_key, &container_key, invocation_id) } diff --git a/src/provider/states/pod/terminated.rs b/src/provider/states/pod/terminated.rs index 9557ef0..1b243f1 100644 --- a/src/provider/states/pod/terminated.rs +++ b/src/provider/states/pod/terminated.rs @@ -38,23 +38,23 @@ impl State for Terminated { // shut down and try to remove the rest of the services if one fails (tbd, do we want that?) if let Some(containers) = pod_handle { for container_handle in containers.values() { - info!("Stopping systemd unit [{}]", container_handle.service_unit); - if let Err(stop_error) = systemd_manager.stop(&container_handle.service_unit) { + let service_unit = &container_handle.service_unit; + + info!("Stopping systemd unit [{}]", service_unit); + if let Err(stop_error) = systemd_manager.stop(service_unit) { error!( "Error occurred stopping systemd unit [{}]: [{}]", - container_handle.service_unit, stop_error + service_unit, stop_error ); return Transition::Complete(Err(stop_error)); } // Daemon reload is false here, we'll do that once after all units have been removed - info!("Removing systemd unit [{}]", container_handle.service_unit); - if let Err(remove_error) = - systemd_manager.remove_unit(&container_handle.service_unit, false) - { + info!("Removing systemd unit [{}]", service_unit); + if let Err(remove_error) = systemd_manager.remove_unit(service_unit, false) { error!( "Error occurred removing systemd unit [{}]: [{}]", - container_handle.service_unit, remove_error + service_unit, remove_error ); return Transition::Complete(Err(remove_error)); } diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index 7d6e79b..786515f 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -1,11 +1,23 @@ -use anyhow::Result; +//! This module provides functions for reading from the journal. + +use anyhow::{Error, Result}; use kubelet::log::Sender; use std::str; use systemd::{journal, journal::JournalRef}; +/// Length where log lines are truncated at; This number is arbitrarily chosen. const MAX_LOG_LINE_LENGTH: usize = 16384; -pub async fn send_journal_entries(sender: &mut Sender, invocation_id: &str) -> Result<()> { +/// Reads journal entries with the given invocation ID and sends the +/// contained messages. +/// +/// The options `tail` and `follow` in [`sender`] are taken into account. +/// If `follow` is `true` then messages are sent until the channel of +/// [`sender`] is closed. In this case an +/// [`Err(kubelet::log::SendError::ChannelClosed)`] will be returned. +/// +/// Messages longer than [`MAX_LOG_LINE_LENGTH`] are truncated. +pub async fn send_messages(sender: &mut Sender, invocation_id: &str) -> Result<()> { let mut journal = journal::OpenOptions::default().open()?; let journal = journal.match_add("_SYSTEMD_INVOCATION_ID", invocation_id)?; @@ -32,6 +44,7 @@ pub async fn send_journal_entries(sender: &mut Sender, invocation_id: &str) -> R Ok(()) } +/// Sends the given number of messages from the journal. async fn send_n_messages( journal: &mut JournalRef, sender: &mut Sender, @@ -50,6 +63,7 @@ async fn send_n_messages( Ok(()) } +/// Sends the remaining messages from the journal. async fn send_remaining_messages(journal: &mut JournalRef, sender: &mut Sender) -> Result<()> { while let Some(message) = next_message(journal)? { send_message(sender, &message).await?; @@ -57,6 +71,13 @@ async fn send_remaining_messages(journal: &mut JournalRef, sender: &mut Sender) Ok(()) } +/// Retrieves the message of the next entry from the journal. +/// +/// Returns [`Ok(Some(message))`] if a message could be successfully retrieved +/// and advances the position in the journal. If the journal entry has no +/// message assigned then `message` is an empty string. +/// Returns [`Ok(None)`] if there are no new entries. +/// Returns [`Err(error)`] if the journal could not be read. fn next_message(journal: &mut JournalRef) -> Result> { let maybe_message = if journal.next()? != 0 { let message = if let Some(entry) = journal.get_data("MESSAGE")? { @@ -75,9 +96,9 @@ fn next_message(journal: &mut JournalRef) -> Result> { Ok(maybe_message) } +/// Sends the given message with a newline character. async fn send_message(sender: &mut Sender, message: &str) -> Result<()> { let mut line = message.to_owned(); line.push('\n'); - sender.send(line).await?; - Ok(()) + sender.send(line).await.map_err(Error::new) } diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index a8cb255..918a5ef 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -373,7 +373,8 @@ impl SystemdManager { } } - pub fn is_running(&self, unit: &str) -> Result { + /// Checks if the ActiveState of the given unit is set to active. + pub fn is_running(&self, unit: &str) -> anyhow::Result { self.get_value::(unit, "ActiveState") .map(|v| v.as_str() == Some("active")) .map_err(|dbus_error| { @@ -385,16 +386,18 @@ impl SystemdManager { }) } - pub fn get_invocation_id(&self, unit: &str) -> Result { + /// Retrieves the invocation ID for the given unit. + pub fn get_invocation_id(&self, unit: &str) -> anyhow::Result { self.get_value::>(unit, "InvocationID") .map(|Variant(vec)| vec.iter().map(|byte| format!("{:02x}", byte)).collect()) } + /// Retrieves the value for the given property of the given unit. pub fn get_value Get<'a>>( &self, unit: &str, property: &str, - ) -> Result, anyhow::Error> { + ) -> anyhow::Result> { // We are using `LoadUnit` here, as GetUnit can fail seemingly at random, when the unit // is not loaded due to systemd garbage collection. // see https://github.com/systemd/systemd/issues/1929 for more information @@ -406,13 +409,15 @@ impl SystemdManager { .connection .with_proxy(SYSTEMD_DESTINATION, &unit_node, self.timeout); - Ok(proxy + let value = proxy .method_call( DBUS_PROPERTIES_INTERFACE, "Get", ("org.freedesktop.systemd1.Unit", property), ) - .map(|r: (Variant,)| r.0)?) + .map(|r: (Variant,)| r.0)?; + + Ok(value) } // Symlink a unit file into the systemd unit folder From 44fe44a318a449f84e74e0fb96913a48c18409b2 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Thu, 22 Apr 2021 15:13:50 +0200 Subject: [PATCH 10/12] tail_lines also shows the first entry if requested --- src/provider/systemdmanager/journal_reader.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index 786515f..63e13d3 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -5,9 +5,6 @@ use kubelet::log::Sender; use std::str; use systemd::{journal, journal::JournalRef}; -/// Length where log lines are truncated at; This number is arbitrarily chosen. -const MAX_LOG_LINE_LENGTH: usize = 16384; - /// Reads journal entries with the given invocation ID and sends the /// contained messages. /// @@ -15,17 +12,16 @@ const MAX_LOG_LINE_LENGTH: usize = 16384; /// If `follow` is `true` then messages are sent until the channel of /// [`sender`] is closed. In this case an /// [`Err(kubelet::log::SendError::ChannelClosed)`] will be returned. -/// -/// Messages longer than [`MAX_LOG_LINE_LENGTH`] are truncated. pub async fn send_messages(sender: &mut Sender, invocation_id: &str) -> Result<()> { let mut journal = journal::OpenOptions::default().open()?; let journal = journal.match_add("_SYSTEMD_INVOCATION_ID", invocation_id)?; - journal.set_data_threshold(MAX_LOG_LINE_LENGTH)?; - if let Some(line_count) = sender.tail() { journal.seek_tail()?; - journal.previous_skip(line_count as u64 + 1)?; + let skipped = journal.previous_skip(line_count as u64 + 1)?; + if skipped < line_count + 1 { + journal.seek_head()?; + } if sender.follow() { send_remaining_messages(journal, sender).await?; From fc6fe5f6144d6d402980ffa32902b7d9131ca3d9 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Thu, 22 Apr 2021 15:39:20 +0200 Subject: [PATCH 11/12] libsystemd-dev added to GitHub workflow --- .github/workflows/rust.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 40fc125..26bfa68 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2 - run: rustup update stable && rustup default stable - - run: sudo apt-get install libdbus-1-dev pkg-config libdbus-1-3 + - run: sudo apt-get install libdbus-1-dev libsystemd-dev pkg-config libdbus-1-3 - run: rustup component add rustfmt - run: cargo fmt --all -- --check @@ -23,7 +23,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Install dbus dependencies - run: sudo apt-get install libdbus-1-dev pkg-config libdbus-1-3 + run: sudo apt-get install libdbus-1-dev libsystemd-dev pkg-config libdbus-1-3 - uses: actions/checkout@v1 - uses: actions-rs/toolchain@v1 with: @@ -40,7 +40,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Install dbus dependencies - run: sudo apt-get install libdbus-1-dev pkg-config libdbus-1-3 + run: sudo apt-get install libdbus-1-dev libsystemd-dev pkg-config libdbus-1-3 - name: Build run: cargo build --verbose - name: Run tests @@ -53,4 +53,4 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/audit-check@v1 with: - token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + token: ${{ secrets.GITHUB_TOKEN }} From c1c3d839543c173d33832185c72600b644bff2be Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 28 Apr 2021 16:19:54 +0200 Subject: [PATCH 12/12] Code commented more clearly --- src/provider/mod.rs | 15 ++++++-- src/provider/systemdmanager/journal_reader.rs | 34 +++++++++++++++---- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 757e569..a5c2520 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -55,6 +55,14 @@ pub struct ProviderState { } /// Contains handles for running pods. +/// +/// A `PodHandleMap` maps a pod key to a pod handle which in turn +/// contains/is a map from a container key to a container handle. +/// A container handle contains all necessary runtime information like the +/// name of the service unit. +/// +/// The implementation of `PodHandleMap` contains functions to access the +/// parts of this structure while preserving the invariants. #[derive(Debug, Default)] struct PodHandleMap { handles: HashMap, @@ -308,11 +316,14 @@ impl Provider for StackableProvider { container: String, mut sender: Sender, ) -> anyhow::Result<()> { - debug!("Logs requested"); - let pod_key = PodKey::new(&namespace, &pod); let container_key = ContainerKey::App(container); + debug!( + "Logs for pod [{:?}] and container [{:?}] requested", + pod_key, container_key + ); + let maybe_container_handle = { let handles = self.shared.handles.read().await; handles diff --git a/src/provider/systemdmanager/journal_reader.rs b/src/provider/systemdmanager/journal_reader.rs index 63e13d3..a3abf82 100644 --- a/src/provider/systemdmanager/journal_reader.rs +++ b/src/provider/systemdmanager/journal_reader.rs @@ -9,19 +9,20 @@ use systemd::{journal, journal::JournalRef}; /// contained messages. /// /// The options `tail` and `follow` in [`sender`] are taken into account. -/// If `follow` is `true` then messages are sent until the channel of -/// [`sender`] is closed. In this case an +/// +/// If `tail` is set with `Some(line_count)` then only the last +/// `line_count` messages (or less if not enough available) are sent +/// otherwise all available messages are sent. +/// +/// If `follow` is `true` then additionally all new messages are sent +/// until the channel of [`sender`] is closed. In this case an /// [`Err(kubelet::log::SendError::ChannelClosed)`] will be returned. pub async fn send_messages(sender: &mut Sender, invocation_id: &str) -> Result<()> { let mut journal = journal::OpenOptions::default().open()?; let journal = journal.match_add("_SYSTEMD_INVOCATION_ID", invocation_id)?; if let Some(line_count) = sender.tail() { - journal.seek_tail()?; - let skipped = journal.previous_skip(line_count as u64 + 1)?; - if skipped < line_count + 1 { - journal.seek_head()?; - } + seek_journal_backwards(journal, line_count)?; if sender.follow() { send_remaining_messages(journal, sender).await?; @@ -40,6 +41,23 @@ pub async fn send_messages(sender: &mut Sender, invocation_id: &str) -> Result<( Ok(()) } +/// Sets the cursor of the journal to the position before the last `count` +/// entries so that the next entry is the first of `count` remaining +/// entries. If the beginning of the journal is reached then the cursor is +/// set to the position before the first entry. +fn seek_journal_backwards(journal: &mut JournalRef, count: usize) -> Result<()> { + journal.seek_tail()?; + + let entries_to_skip = count + 1; + let skipped = journal.previous_skip(entries_to_skip as u64)?; + let beginning_reached = skipped < entries_to_skip; + if beginning_reached { + journal.seek_head()?; + } + + Ok(()) +} + /// Sends the given number of messages from the journal. async fn send_n_messages( journal: &mut JournalRef, @@ -80,9 +98,11 @@ fn next_message(journal: &mut JournalRef) -> Result> { if let Some(value) = entry.value() { String::from_utf8_lossy(value).into() } else { + // The MESSAGE field contains no text, i.e. `MESSAGE=`. String::new() } } else { + // The journal entry contains no MESSAGE field. String::new() }; Some(message)