Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 48 additions & 6 deletions src/provider/states/running.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use anyhow::anyhow;
use k8s_openapi::api::core::v1::{
ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus, PodCondition,
};
use kubelet::pod::Pod;
use kubelet::state::prelude::*;
use kubelet::state::{State, Transition};
use log::{debug, trace};
use log::{debug, info, trace};

use crate::provider::states::failed::Failed;
use crate::provider::states::installing::Installing;
use crate::provider::states::make_status_with_containers_and_condition;
use crate::provider::PodState;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use k8s_openapi::chrono;
use tokio::time::Duration;

#[derive(Debug, TransitionTo)]
#[transition_to(Failed, Running, Installing)]
Expand All @@ -34,17 +36,57 @@ impl State<PodState> for Running {
pod_state: &mut PodState,
_pod: &Pod,
) -> Transition<PodState> {
// We loop here indefinitely and "wake up" periodically to check if the service is still
// up and running
// Interruption of this loop is triggered externally by the Krustlet code when
// - the pod which this state machine refers to gets deleted
// - Krustlet shuts down
loop {
tokio::select! {
_ = tokio::time::delay_for(std::time::Duration::from_secs(10)) => {
trace!("Checking if service {} is still running.", &pod_state.service_name);
tokio::time::delay_for(Duration::from_secs(10)).await;
trace!(
"Checking if service {} is still running.",
&pod_state.service_name
);

// Iterate over all units and check their state
// if the [`service_units`] Option is a None variant, return a failed state
// as we need to run something otherwise we are not doing anything
let systemd_units = match &pod_state.service_units {
Some(units) => units,
None => return Transition::Complete(Err(anyhow!(format!("No systemd units found for service [{}], this should not happen, please report a bug for this!", pod_state.service_name)))),
};

for unit in systemd_units {
match pod_state.systemd_manager.is_running(&unit.get_name()) {
Ok(true) => trace!(
"Unit [{}] of service [{}] still running ...",
&unit.get_name(),
pod_state.service_name
),
Ok(false) => {
info!("Unit [{}] for service [{}] failed unexpectedly, transitioning to failed state.", pod_state.service_name, unit.get_name());
return Transition::next(
self,
Failed {
message: "".to_string(),
},
);
}
Err(dbus_error) => {
info!(
"Error querying ActiveState for Unit [{}] of service [{}]: [{}].",
pod_state.service_name,
unit.get_name(),
dbus_error
);
return Transition::Complete(Err(dbus_error));
}
}
}
// TODO: We are not watching the service yet, need to subscribe to events and
// react to those
}
}

// test
async fn json_status(
&self,
pod_state: &mut PodState,
Expand Down
36 changes: 35 additions & 1 deletion src/provider/states/starting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use crate::provider::states::failed::Failed;
use crate::provider::states::running::Running;
use crate::provider::states::setup_failed::SetupFailed;
use crate::provider::PodState;
use log::{error, info, warn};
use anyhow::anyhow;
use log::{debug, error, info, warn};
use std::time::Instant;
use tokio::time::Duration;

#[derive(Default, Debug, TransitionTo)]
#[transition_to(Running, Failed, SetupFailed)]
Expand Down Expand Up @@ -36,6 +39,37 @@ impl State<PodState> for Starting {
);
return Transition::Complete(Err(enable_error));
}

let start_time = Instant::now();
// TODO: does this need to be configurable, or ar we happy with a hard coded value
// for now. I've briefly looked at the podspec and couldn't identify a good field
// to use for this - also, currently this starts containers (= systemd units) in
// order and waits 10 seconds for every unit, so a service with five containers
// would take 50 seconds until it reported running - which is totally fine in case
// the units actually depend on each other, but a case could be made for waiting
// once at the end
while start_time.elapsed().as_secs() < 10 {
tokio::time::delay_for(Duration::from_secs(1)).await;
debug!(
"Checking if unit [{}] is still up and running.",
&unit.get_name()
);
match pod_state.systemd_manager.is_running(&unit.get_name()) {
Ok(true) => debug!(
"Service [{}] still running after [{}] seconds",
&unit.get_name(),
start_time.elapsed().as_secs()
),
Ok(false) => {
return Transition::Complete(Err(anyhow!(format!(
"Unit [{}] stopped unexpectedly during startup after [{}] seconds.",
&unit.get_name(),
start_time.elapsed().as_secs()
))))
}
Err(dbus_error) => return Transition::Complete(Err(dbus_error)),
}
}
}
} else {
warn!(
Expand Down
30 changes: 25 additions & 5 deletions src/provider/systemdmanager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//!
use crate::provider::systemdmanager::systemdunit::SystemDUnit;
use anyhow::anyhow;
use dbus::arg::{AppendAll, ReadAll};
use dbus::arg::{AppendAll, ReadAll, Variant};
use dbus::blocking::SyncConnection;
use dbus::strings::Member;
use dbus::Path;
Expand All @@ -25,6 +25,7 @@ pub enum UnitTypes {
const SYSTEMD_DESTINATION: &str = "org.freedesktop.systemd1";
const SYSTEMD_NODE: &str = "/org/freedesktop/systemd1";
const SYSTEMD_MANAGER_INTERFACE: &str = "org.freedesktop.systemd1.Manager";
const DBUS_PROPERTIES_INTERFACE: &str = "org.freedesktop.DBus.Properties";

/// The main way of interacting with this module, this struct offers
/// the public methods for managing service units.
Expand Down Expand Up @@ -171,10 +172,7 @@ impl SystemdManager {
}

let unit_file = self.units_directory.join(&unit_name);
if linked_unit_file
&& unit_file.exists()
&& unit_file.symlink_metadata()?.file_type().is_file()
{
if unit_file.exists() && unit_file.symlink_metadata()?.file_type().is_file() {
// Handle the special case where we need to replace an actual file with a symlink
// This only occurs when switching from writing the file
// directly into the units folder to using a linked file - should not happen in practice
Expand Down Expand Up @@ -357,6 +355,28 @@ impl SystemdManager {
}
}

pub fn is_running(&self, unit: &str) -> Result<bool, anyhow::Error> {
let unit_node = self
.method_call("GetUnit", (&unit,))
.map(|r: (Path,)| r.0)?;

let proxy = self
.connection
.with_proxy(SYSTEMD_DESTINATION, &unit_node, self.timeout);

let active_state = proxy
.method_call(
DBUS_PROPERTIES_INTERFACE,
"Get",
("org.freedesktop.systemd1.Unit", "ActiveState"),
)
.map(|r: (Variant<String>,)| r.0)?;

// TODO: I think this can panic, there should be a get() method on Variant that returns
// an option, but I've not yet been able to get that to work
Ok(active_state.0 == "active")
}

// Symlink a unit file into the systemd unit folder
// This is not public on purpose, as [create] should be the normal way to link unit files
// when using this crate
Expand Down