Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

== 0.4.0 - unreleased

:188: https://github.com/stackabletech/agent/pull/188[#188]

=== Added
* Annotation `featureLogs` added to the pods to indicate if logs can be
retrieved with `kubectl logs` ({188}).

== 0.3.0 - 2021-05-27

:165: https://github.com/stackabletech/agent/pull/165[#165]
Expand Down
2 changes: 2 additions & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
** xref:installation/building.adoc[]
** xref:installation/binaries.adoc[]
* xref:configuration.adoc[]
* Monitoring
** xref:monitoring/logs.adoc[]
28 changes: 28 additions & 0 deletions docs/modules/ROOT/pages/monitoring/logs.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
= Logs

The logs of a pod can be retrieved with `kubectl logs`.

$ kubectl logs apache-kafka
[2021-06-01 13:51:03,852] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-06-01 13:51:04,361] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2021-06-01 13:51:04,362] INFO starting (kafka.server.KafkaServer)

For this to work systemd version 232 or newer must be installed on the
node. This is the case for Debian 10 and CentOS 8 but not for CentOS 7.
The annotation `featureLogs` with a value of `true` or `false` is added
to all pods to indicate the availability of the logs.

$ kubectl describe pod apache-kafka
Annotations: featureLogs: true

If `featureLogs` is `false` then the output of `kubectl logs` is empty.

The following options are not yet supported:

* `--limit-bytes`
* `-p --previous`
* `--since`
* `--since-time`
* `--timestamps`
38 changes: 19 additions & 19 deletions src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,27 +339,27 @@ impl Provider for StackableProvider {
container_key
)
})?;
let invocation_id = container_handle.invocation_id.ok_or_else(|| {
anyhow!(
"Invocation ID for container [{}] in pod [{:?}] is unknown. \
The service is probably not started yet.",
container_key,
pod_key
)
})?;

task::spawn_blocking(move || {
let result = Runtime::new()
.unwrap()
.block_on(journal_reader::send_messages(&mut sender, &invocation_id));

if let Err(error) = result {
match error.downcast_ref::<SendError>() {
Some(SendError::ChannelClosed) => (),
_ => error!("Log could not be sent. {}", error),
if let Some(invocation_id) = container_handle.invocation_id {
task::spawn_blocking(move || {
let result = Runtime::new()
.unwrap()
.block_on(journal_reader::send_messages(&mut sender, &invocation_id));

if let Err(error) = result {
match error.downcast_ref::<SendError>() {
Some(SendError::ChannelClosed) => (),
_ => error!("Log could not be sent. {}", error),
}
}
}
});
});
} else {
debug!(
"Logs for pod [{:?}] and container [{:?}] cannot be sent \
because the invocation ID is not available.",
pod_key, container_key
);
}

Ok(())
}
Expand Down
53 changes: 50 additions & 3 deletions src/provider/states/pod/starting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ use crate::provider::{
};

use anyhow::{anyhow, Result};
use kube::{
api::{Patch, PatchParams},
Api, Client,
};
use kubelet::pod::state::prelude::*;
use kubelet::{
container::ContainerKey,
pod::{Pod, PodKey},
};
use log::{debug, error, info};
use serde_json::json;
use std::time::Instant;
use tokio::time::{self, Duration};

Expand Down Expand Up @@ -53,10 +58,11 @@ async fn start_service_units(
) -> Result<()> {
let pod_key = &PodKey::from(pod);

let (systemd_manager, pod_handle) = {
let (client, systemd_manager, pod_handle) = {
let provider_state = shared.read().await;
let handles = provider_state.handles.read().await;
(
provider_state.client.clone(),
provider_state.systemd_manager.clone(),
handles.get(&pod_key).map(PodHandle::to_owned),
)
Expand Down Expand Up @@ -87,8 +93,17 @@ async fn start_service_units(
await_startup(&systemd_manager, service_unit, Duration::from_secs(10)).await?;
}

let invocation_id = systemd_manager.get_invocation_id(service_unit).await?;
store_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?;
let maybe_invocation_id = systemd_manager.get_invocation_id(service_unit).await.ok();
if let Some(invocation_id) = &maybe_invocation_id {
store_invocation_id(shared.clone(), pod_key, &container_key, &invocation_id).await?;
}
add_annotation(
&client,
pod,
"featureLogs",
&maybe_invocation_id.is_some().to_string(),
)
.await?;
}

Ok(())
Expand Down Expand Up @@ -143,3 +158,35 @@ async fn store_invocation_id(
let mut handles = provider_state.handles.write().await;
handles.set_invocation_id(&pod_key, &container_key, invocation_id)
}

/// Adds an annotation to the given pod.
///
/// If there is already an annotation with the given key then the value
/// is replaced.
/// The function returns when the patch is sent. It does not await the
/// changes to be visible to the watching clients.
async fn add_annotation(client: &Client, pod: &Pod, key: &str, value: &str) -> kube::Result<Pod> {
debug!(
"Adding annotation [{}: {}] to pod [{:?}]",
key,
value,
PodKey::from(pod)
);

let api: Api<Pod> = Api::namespaced(client.clone(), pod.namespace());

let patch = json!({
"metadata": {
"annotations": {
key: value
}
}
});

api.patch(
pod.name(),
&PatchParams::default(),
&Patch::Strategic(patch),
)
.await
}
2 changes: 2 additions & 0 deletions src/provider/systemdmanager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ impl SystemdManager {
}

/// Retrieves the invocation ID for the given unit.
///
/// The invocation ID was introduced in systemd version 232.
pub async fn get_invocation_id(&self, unit: &str) -> anyhow::Result<String> {
self.proxy
.load_unit(unit)
Expand Down