diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index eca9a2e..1d9e8cc 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -2,6 +2,12 @@ == 0.4.0 - unreleased +:188: https://github.com/stackabletech/agent/pull/188[#188] + +=== Added +* Annotation `featureLogs` added to the pods to indicate if logs can be + retrieved with `kubectl logs` ({188}). + == 0.3.0 - 2021-05-27 :165: https://github.com/stackabletech/agent/pull/165[#165] diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 4a884e4..3363c9e 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -2,3 +2,5 @@ ** xref:installation/building.adoc[] ** xref:installation/binaries.adoc[] * xref:configuration.adoc[] +* Monitoring +** xref:monitoring/logs.adoc[] diff --git a/docs/modules/ROOT/pages/monitoring/logs.adoc b/docs/modules/ROOT/pages/monitoring/logs.adoc new file mode 100644 index 0000000..8695ccd --- /dev/null +++ b/docs/modules/ROOT/pages/monitoring/logs.adoc @@ -0,0 +1,28 @@ += Logs + +The logs of a pod can be retrieved with `kubectl logs`. + + $ kubectl logs apache-kafka + [2021-06-01 13:51:03,852] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) + [2021-06-01 13:51:04,361] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler) + [2021-06-01 13:51:04,362] INFO starting (kafka.server.KafkaServer) + +For this to work systemd version 232 or newer must be installed on the +node. This is the case for Debian 10 and CentOS 8 but not for CentOS 7. +The annotation `featureLogs` with a value of `true` or `false` is added +to all pods to indicate the availability of the logs. + + $ kubectl describe pod apache-kafka + … + Annotations: featureLogs: true + … + +If `featureLogs` is `false` then the output of `kubectl logs` is empty. + +The following options are not yet supported: + + * `--limit-bytes` + * `-p --previous` + * `--since` + * `--since-time` + * `--timestamps` diff --git a/src/provider/mod.rs b/src/provider/mod.rs index a8263d3..0f01cb3 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -339,27 +339,27 @@ impl Provider for StackableProvider { container_key ) })?; - let invocation_id = container_handle.invocation_id.ok_or_else(|| { - anyhow!( - "Invocation ID for container [{}] in pod [{:?}] is unknown. \ - The service is probably not started yet.", - container_key, - pod_key - ) - })?; - - task::spawn_blocking(move || { - let result = Runtime::new() - .unwrap() - .block_on(journal_reader::send_messages(&mut sender, &invocation_id)); - if let Err(error) = result { - match error.downcast_ref::() { - Some(SendError::ChannelClosed) => (), - _ => error!("Log could not be sent. {}", error), + if let Some(invocation_id) = container_handle.invocation_id { + task::spawn_blocking(move || { + let result = Runtime::new() + .unwrap() + .block_on(journal_reader::send_messages(&mut sender, &invocation_id)); + + if let Err(error) = result { + match error.downcast_ref::() { + Some(SendError::ChannelClosed) => (), + _ => error!("Log could not be sent. {}", error), + } } - } - }); + }); + } else { + debug!( + "Logs for pod [{:?}] and container [{:?}] cannot be sent \ + because the invocation ID is not available.", + pod_key, container_key + ); + } Ok(()) } diff --git a/src/provider/states/pod/starting.rs b/src/provider/states/pod/starting.rs index 841093a..1d4f40c 100644 --- a/src/provider/states/pod/starting.rs +++ b/src/provider/states/pod/starting.rs @@ -4,12 +4,17 @@ use crate::provider::{ }; use anyhow::{anyhow, Result}; +use kube::{ + api::{Patch, PatchParams}, + Api, Client, +}; use kubelet::pod::state::prelude::*; use kubelet::{ container::ContainerKey, pod::{Pod, PodKey}, }; use log::{debug, error, info}; +use serde_json::json; use std::time::Instant; use tokio::time::{self, Duration}; @@ -53,10 +58,11 @@ async fn start_service_units( ) -> Result<()> { let pod_key = &PodKey::from(pod); - let (systemd_manager, pod_handle) = { + let (client, systemd_manager, pod_handle) = { let provider_state = shared.read().await; let handles = provider_state.handles.read().await; ( + provider_state.client.clone(), provider_state.systemd_manager.clone(), handles.get(&pod_key).map(PodHandle::to_owned), ) @@ -87,8 +93,17 @@ async fn start_service_units( await_startup(&systemd_manager, service_unit, Duration::from_secs(10)).await?; } - let invocation_id = systemd_manager.get_invocation_id(service_unit).await?; - store_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?; + let maybe_invocation_id = systemd_manager.get_invocation_id(service_unit).await.ok(); + if let Some(invocation_id) = &maybe_invocation_id { + store_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?; + } + add_annotation( + &client, + pod, + "featureLogs", + &maybe_invocation_id.is_some().to_string(), + ) + .await?; } Ok(()) @@ -143,3 +158,35 @@ async fn store_invocation_id( let mut handles = provider_state.handles.write().await; handles.set_invocation_id(&pod_key, &container_key, invocation_id) } + +/// Adds an annotation to the given pod. +/// +/// If there is already an annotation with the given key then the value +/// is replaced. +/// The function returns when the patch is sent. It does not await the +/// changes to be visible to the watching clients. +async fn add_annotation(client: &Client, pod: &Pod, key: &str, value: &str) -> kube::Result { + debug!( + "Adding annotation [{}: {}] to pod [{:?}]", + key, + value, + PodKey::from(pod) + ); + + let api: Api = Api::namespaced(client.clone(), pod.namespace()); + + let patch = json!({ + "metadata": { + "annotations": { + key: value + } + } + }); + + api.patch( + pod.name(), + &PatchParams::default(), + &Patch::Strategic(patch), + ) + .await +} diff --git a/src/provider/systemdmanager/manager.rs b/src/provider/systemdmanager/manager.rs index 05bf3d1..7d1a312 100644 --- a/src/provider/systemdmanager/manager.rs +++ b/src/provider/systemdmanager/manager.rs @@ -389,6 +389,8 @@ impl SystemdManager { } /// Retrieves the invocation ID for the given unit. + /// + /// The invocation ID was introduced in systemd version 232. pub async fn get_invocation_id(&self, unit: &str) -> anyhow::Result { self.proxy .load_unit(unit)