Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
== 0.6.0 - unreleased

:262: https://github.com/stackabletech/agent/pull/262[#262]
:263: https://github.com/stackabletech/agent/pull/263[#263]
:267: https://github.com/stackabletech/agent/pull/267[#267]
:270: https://github.com/stackabletech/agent/pull/270[#270]
:273: https://github.com/stackabletech/agent/pull/273[#273]
Expand All @@ -23,6 +24,16 @@
but not any longer with versions prior to v1.19 ({267}).
* Error message improved which is logged if a systemd unit file cannot
be created ({276}).
* Handling of service restarts moved from the Stackable agent to
systemd ({263}).

=== Removed
* Check removed if a service starts up correctly within 10 seconds.
systemd manages restarts now and the Stackable agent cannot detect if
a service is in a restart loop ({263}).

=== Fixed
* Systemd services in session mode are restarted after a reboot ({263}).

== 0.5.0 - 2021-07-26

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ krator = { git = "https://github.com/stackabletech/krustlet.git", branch = "stac
kube = { version= "0.48", default-features = false, features = ["derive", "native-tls"] }
kubelet = { git = "https://github.com/stackabletech/krustlet.git", branch = "stackable_patches_v0.7.0", default-features = true, features= ["derive", "cli"] } # version = "0.7"
Inflector = "0.11"
json-patch = "0.2"
lazy_static = "1.4"
log = "0.4"
multimap = "0.8"
Expand Down
4 changes: 0 additions & 4 deletions docs/modules/ROOT/pages/services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,3 @@ A pod which provides a service should never terminate on its own, so the
command:
- <service-command>
restartPolicy: Always

After a container command is executed the agent waits for 10 seconds
before the container status is set to running. When all containers are
running, also the pod phase is switched from `Pending` to `Running`.
42 changes: 41 additions & 1 deletion src/provider/kubernetes/status.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Functions for patching the pod status

use anyhow::anyhow;
use k8s_openapi::api::core::v1::Pod as KubePod;
use kube::{Api, Client};
use kube::{
api::{Patch, PatchParams},
Api, Client,
};
use kubelet::{
container::{ContainerKey, Status},
pod::Pod,
Expand Down Expand Up @@ -30,3 +34,39 @@ pub async fn patch_container_status(
);
}
}

/// Patches the restart count of a container.
pub async fn patch_restart_count(
client: &Client,
pod: &Pod,
container_key: &ContainerKey,
restart_count: u32,
) -> anyhow::Result<()> {
let api: Api<KubePod> = Api::namespaced(client.clone(), pod.namespace());

let index = pod
.container_status_index(container_key)
.ok_or_else(|| anyhow!("Container not found"))?;

let container_type = if container_key.is_init() {
"initContainer"
} else {
"container"
};

let patch = json_patch::Patch(vec![json_patch::PatchOperation::Replace(
json_patch::ReplaceOperation {
path: format!("/status/{}Statuses/{}/restartCount", container_type, index),
value: restart_count.into(),
},
)]);

api.patch_status(
pod.name(),
&PatchParams::default(),
&Patch::<()>::Json(patch),
)
.await?;

Ok(())
}
75 changes: 41 additions & 34 deletions src/provider/states/pod/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ use kubelet::{
use log::{debug, info, trace, warn};
use tokio::time::Duration;

use super::{installing::Installing, starting::Starting, terminated::Terminated};
use super::terminated::Terminated;
use crate::provider::{
kubernetes::{
accessor::{restart_policy, RestartPolicy},
status::patch_container_status,
},
kubernetes::status::{patch_container_status, patch_restart_count},
systemdmanager::service::ServiceState,
PodHandle, PodState, ProviderState,
};

#[derive(Debug, TransitionTo)]
#[transition_to(Installing, Starting, Terminated)]
#[transition_to(Terminated)]
pub struct Running {
pub transition_time: Time,
}
Expand Down Expand Up @@ -66,7 +64,7 @@ impl State<PodState> for Running {
// Interruption of this loop is triggered externally by the Krustlet code when
// - the pod which this state machine refers to gets deleted
// - Krustlet shuts down
while !running_containers.is_empty() && !container_failed {
while !running_containers.is_empty() {
tokio::time::sleep(Duration::from_secs(10)).await;
trace!(
"Checking if service {} is still running.",
Expand All @@ -79,23 +77,23 @@ impl State<PodState> for Running {
for (container_key, container_handle) in running_containers.iter() {
let systemd_service = &container_handle.systemd_service;

match systemd_service.is_running().await {
Ok(true) => {}
Ok(false) => match systemd_service.failed().await {
Ok(true) => failed_containers
.push((container_key.to_owned(), container_handle.to_owned())),
Ok(false) => succeeded_containers
.push((container_key.to_owned(), container_handle.to_owned())),
Err(dbus_error) => warn!(
"Error querying Failed property for Unit [{}] of service [{}]: [{}]",
match systemd_service.service_state().await {
Ok(ServiceState::Created) => {
warn!(
"The unit [{}] of service [{}] was not started. \
This should not happen. Ignoring this state for now.",
systemd_service.file(),
pod_state.service_name,
dbus_error
),
},
pod_state.service_name
);
}
Ok(ServiceState::Started) => {}
Ok(ServiceState::Succeeded) => succeeded_containers
.push((container_key.to_owned(), container_handle.to_owned())),
Ok(ServiceState::Failed) => failed_containers
.push((container_key.to_owned(), container_handle.to_owned())),
Err(dbus_error) => {
warn!(
"Error querying ActiveState for Unit [{}] of service [{}]: [{}].",
"Error querying state for unit [{}] of service [{}]: [{}].",
systemd_service.file(),
pod_state.service_name,
dbus_error
Expand Down Expand Up @@ -132,29 +130,38 @@ impl State<PodState> for Running {
)
.await;
running_containers.remove(container_key);
container_failed = true;
}

for container_handle in running_containers.values() {
for (container_key, container_handle) in running_containers.iter() {
trace!(
"Unit [{}] of service [{}] still running ...",
container_handle.service_unit,
pod_state.service_name
);
}

container_failed = !failed_containers.is_empty();
}

if container_failed {
if restart_policy(&pod) == RestartPolicy::Never {
Transition::next(self, Terminated { successful: false })
} else {
debug!("Restart policy is set to restart, starting...");
Transition::next(self, Starting {})
match container_handle.systemd_service.restart_count().await {
Ok(restart_count) => {
if let Err(error) =
patch_restart_count(&client, &pod, container_key, restart_count).await
{
warn!("Could not patch restart count: {}", error);
}
}
Err(error) => warn!(
"Could retrieve restart count from unit [{}]: {}",
container_handle.service_unit, error
),
}
}
} else {
Transition::next(self, Terminated { successful: true })
}

Transition::next(
self,
Terminated {
successful: !container_failed,
},
)
}

async fn status(&self, pod_state: &mut PodState, _pod: &Pod) -> anyhow::Result<PodStatus> {
Expand Down
72 changes: 11 additions & 61 deletions src/provider/states/pod/starting.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use super::running::Running;
use crate::provider::{
kubernetes::{
accessor::{restart_policy, RestartPolicy},
status::patch_container_status,
},
systemdmanager::service::SystemdService,
PodHandle, PodState, ProviderState,
kubernetes::status::patch_container_status, systemdmanager::service::ServiceState, PodHandle,
PodState, ProviderState,
};

use anyhow::{anyhow, Result};
use anyhow::Result;
use kube::{
api::{Patch, PatchParams},
Api, Client,
Expand All @@ -17,8 +13,6 @@ use kubelet::pod::{Pod, PodKey};
use kubelet::{container::Status, pod::state::prelude::*};
use log::{debug, error, info};
use serde_json::json;
use std::time::Instant;
use tokio::time::{self, Duration};

#[derive(Default, Debug, TransitionTo)]
#[transition_to(Running)]
Expand Down Expand Up @@ -50,9 +44,7 @@ impl State<PodState> for Starting {

/// Starts the service units for the containers of the given pod.
///
/// The units are started and enabled if they are not already running.
/// The startup is considered successful if the unit is still running
/// after 10 seconds.
/// The units are started and enabled if they were not already started.
async fn start_service_units(
shared: SharedState<ProviderState>,
pod_state: &PodState,
Expand All @@ -72,32 +64,19 @@ async fn start_service_units(

for (container_key, container_handle) in pod_handle.unwrap_or_default() {
let systemd_service = &container_handle.systemd_service;
let service_unit = &container_handle.service_unit;

if systemd_service.is_running().await? {
debug!(
"Unit [{}] for service [{}] is already running. Skip startup.",
systemd_service.file(),
&pod_state.service_name
);
} else {
let service_unit = &container_handle.service_unit;

if systemd_service.service_state().await? == ServiceState::Created {
info!("Starting systemd unit [{}]", service_unit);
systemd_manager.start(service_unit).await?;

info!("Enabling systemd unit [{}]", service_unit);
systemd_manager.enable(service_unit).await?;

if restart_policy(pod) == RestartPolicy::Always {
// TODO: does this need to be configurable, or ar we happy with a hard coded value
// for now. I've briefly looked at the podspec and couldn't identify a good field
// to use for this - also, currently this starts containers (= systemd units) in
// order and waits 10 seconds for every unit, so a service with five containers
// would take 50 seconds until it reported running - which is totally fine in case
// the units actually depend on each other, but a case could be made for waiting
// once at the end
await_startup(systemd_service, Duration::from_secs(10)).await?;
}
} else {
debug!(
"Unit [{}] for service [{}] was already started. Skipping startup.",
service_unit, &pod_state.service_name
);
}

add_annotation(
Expand All @@ -114,35 +93,6 @@ async fn start_service_units(
Ok(())
}

/// Checks if the given service unit is still running after the given duration.
async fn await_startup(systemd_service: &SystemdService, duration: Duration) -> Result<()> {
let start_time = Instant::now();
while start_time.elapsed() < duration {
time::sleep(Duration::from_secs(1)).await;

debug!(
"Checking if unit [{}] is still up and running.",
systemd_service.file()
);

if systemd_service.is_running().await? {
debug!(
"Service [{}] still running after [{}] seconds",
systemd_service.file(),
start_time.elapsed().as_secs()
);
} else {
return Err(anyhow!(
"Unit [{}] stopped unexpectedly during startup after [{}] seconds.",
systemd_service.file(),
start_time.elapsed().as_secs()
));
}
}

Ok(())
}

/// Adds an annotation to the given pod.
///
/// If there is already an annotation with the given key then the value
Expand Down
Loading