Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

== 0.5.0 - unreleased

:224: https://github.com/stackabletech/agent/pull/224[#224]

=== Added
* `hostIP` and `podIP` added to the pod status ({224}).

== 0.4.0 - 2021-06-23

:188: https://github.com/stackabletech/agent/pull/188[#188]
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 9 additions & 13 deletions src/bin/stackable-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,22 @@ async fn main() -> anyhow::Result<()> {
let server_config = ServerConfig {
addr: agent_config.server_ip_address,
port: agent_config.server_port,
cert_file: agent_config.server_cert_file.unwrap_or_default(),
private_key_file: agent_config.server_key_file.unwrap_or_default(),
cert_file: agent_config.server_cert_file.to_owned().unwrap_or_default(),
private_key_file: agent_config.server_key_file.to_owned().unwrap_or_default(),
};

let plugins_directory = agent_config.data_directory.join("plugins");

let krustlet_config = Config {
node_ip: agent_config.server_ip_address,
hostname: agent_config.hostname.clone(),
node_name: agent_config.hostname,
hostname: agent_config.hostname.to_owned(),
node_name: agent_config.hostname.to_owned(),
server_config,
data_dir: agent_config.data_directory.clone(),
plugins_dir: plugins_directory.clone(),
node_labels: agent_config.tags,
data_dir: agent_config.data_directory.to_owned(),
plugins_dir: plugins_directory.to_owned(),
node_labels: agent_config.tags.to_owned(),
max_pods: 110,
bootstrap_file: agent_config.bootstrap_file,
bootstrap_file: agent_config.bootstrap_file.to_owned(),
allow_local_modules: false,
insecure_registries: None,
};
Expand All @@ -89,11 +89,7 @@ async fn main() -> anyhow::Result<()> {

let provider = StackableProvider::new(
kube::Client::new(kubeconfig.clone()),
agent_config.parcel_directory.clone(),
agent_config.config_directory.clone(),
agent_config.log_directory.clone(),
agent_config.session,
agent_config.pod_cidr,
&agent_config,
krustlet_config.max_pods,
)
.await
Expand Down
26 changes: 13 additions & 13 deletions src/provider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryFrom;
use std::fs;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -18,19 +19,20 @@ use kubelet::{
use log::{debug, error};
use tokio::{runtime::Runtime, sync::RwLock, task};

use crate::config::AgentConfig;
use crate::provider::error::StackableError;
use crate::provider::error::StackableError::{
CrdMissing, KubeError, MissingObjectKey, PodValidationError,
};
use crate::provider::repository::package::Package;
use crate::provider::states::pod::downloading::Downloading;
use crate::provider::states::pod::terminated::Terminated;
use crate::provider::states::pod::PodState;
use crate::provider::systemdmanager::manager::SystemdManager;
use kube::error::ErrorResponse;
use std::collections::HashMap;
use systemdmanager::journal_reader;

use self::states::pod::{initializing::Initializing, terminated::Terminated};

pub struct StackableProvider {
shared: ProviderState,
parcel_directory: PathBuf,
Expand All @@ -53,6 +55,7 @@ pub struct ProviderState {
handles: Arc<RwLock<PodHandleMap>>,
client: Client,
systemd_manager: Arc<SystemdManager>,
server_ip_address: IpAddr,
}

/// Contains handles for running pods.
Expand Down Expand Up @@ -171,27 +174,24 @@ impl ContainerHandle {
impl StackableProvider {
pub async fn new(
client: Client,
parcel_directory: PathBuf,
config_directory: PathBuf,
log_directory: PathBuf,
session: bool,
pod_cidr: String,
agent_config: &AgentConfig,
max_pods: u16,
) -> Result<Self, StackableError> {
let systemd_manager = Arc::new(SystemdManager::new(session, max_pods).await?);
let systemd_manager = Arc::new(SystemdManager::new(agent_config.session, max_pods).await?);

let provider_state = ProviderState {
handles: Default::default(),
client,
systemd_manager,
server_ip_address: agent_config.server_ip_address,
};

let provider = StackableProvider {
shared: provider_state,
parcel_directory,
config_directory,
log_directory,
pod_cidr,
parcel_directory: agent_config.parcel_directory.to_owned(),
config_directory: agent_config.config_directory.to_owned(),
log_directory: agent_config.log_directory.to_owned(),
pod_cidr: agent_config.pod_cidr.to_owned(),
};
let missing_crds = provider.check_crds().await?;
return if missing_crds.is_empty() {
Expand Down Expand Up @@ -255,7 +255,7 @@ impl StackableProvider {
impl Provider for StackableProvider {
type ProviderState = ProviderState;
type PodState = PodState;
type InitialState = Downloading;
type InitialState = Initializing;
type TerminatedState = Terminated;

const ARCH: &'static str = "stackable-linux";
Expand Down
1 change: 1 addition & 0 deletions src/provider/states/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) mod creating_config;
pub(crate) mod creating_service;
pub(crate) mod downloading;
pub(crate) mod downloading_backoff;
pub(crate) mod initializing;
pub(crate) mod installing;
pub(crate) mod running;
pub(crate) mod setup_failed;
Expand Down
76 changes: 76 additions & 0 deletions src/provider/states/pod/initializing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::net::IpAddr;

use anyhow::Result;
use k8s_openapi::api::core::v1::Pod as KubePod;
use k8s_openapi::api::core::v1::PodStatus as KubePodStatus;
use kube::api::Patch;
use kube::api::PatchParams;
use kube::Api;
use kubelet::pod::state::prelude::*;
use log::trace;
use log::warn;
use serde_json::json;

use super::downloading::Downloading;
use crate::provider::{PodState, ProviderState};

#[derive(Default, Debug, TransitionTo)]
#[transition_to(Downloading)]
pub struct Initializing;

#[async_trait::async_trait]
impl State<PodState> for Initializing {
async fn next(
self: Box<Self>,
shared: SharedState<ProviderState>,
_pod_state: &mut PodState,
pod: Manifest<Pod>,
) -> Transition<PodState> {
let (client, server_ip_address) = {
let provider_state = shared.read().await;
(
provider_state.client.clone(),
provider_state.server_ip_address,
)
};

let pod = pod.latest();

let api = Api::namespaced(client, pod.namespace());

match patch_ip_address(&api, pod.name(), server_ip_address).await {
Ok(_) => trace!(
"Status of pod [{}] patched with hostIP and podIP [{}]",
pod.name(),
server_ip_address
),
Err(error) => warn!(
"Status of pod [{}] could not be patched with hostIP and podIP: {}",
pod.name(),
error
),
}

Transition::next(self, Downloading)
}

async fn status(&self, _pod_state: &mut PodState, _pod: &Pod) -> Result<PodStatus> {
Ok(make_status(Phase::Pending, "Initializing"))
}
}

/// Patches the `hostIP` and `podIP` in the pod status.
async fn patch_ip_address(api: &Api<KubePod>, pod_name: &str, ip_address: IpAddr) -> Result<()> {
let patch = json!({
"status": Some(KubePodStatus {
host_ip: Some(ip_address.to_string()),
pod_ip: Some(ip_address.to_string()),
..Default::default()
})
});

api.patch_status(pod_name, &PatchParams::default(), &Patch::Strategic(patch))
.await?;

Ok(())
}