Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ serde = "1"
serde_json = "1"
serde_yaml = "0.8"
spectral = "0.6"
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio = { version = "1.6", features = ["macros", "rt-multi-thread"] }
uuid = { version = "0.8", features = ["v4"] }
113 changes: 112 additions & 1 deletion tests/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod test;
use std::time::Duration;
use futures::future::join_all;
use std::{fmt, time::Duration};
use test::prelude::*;

#[test]
Expand Down Expand Up @@ -66,3 +67,113 @@ fn restart_after_ungraceful_shutdown_should_succeed() {
client.verify_pod_condition(&pod, "Ready");
}
}

#[tokio::test(flavor = "multi_thread")]
async fn starting_and_stopping_100_pods_simultaneously_should_succeed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment on this test, my understanding is, that this is to stress test a single agent, correct?
In that case it might be useful to pick one of the available nodes and explicitly assing all pods to that node, either via directly setting nodeName in the spec, or by adding a nodeSelector that targets that host.
Alternatively maybe make it "NUM_PODS_PER_NODE" and multiply by the number of nodes ... but that might create additional issues depending on how the scheduler assigns them..

Currently this runs fairly dissimilar tests from the perspective of an individual agent, depending on whether it is run against a cluster with 1, 3 or 10 nodes..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All pods are now scheduled on the same node via nodeName.

let mut client = KubeClient::new()
.await
.expect("Kubernetes client could not be created");
client.timeouts.create = Duration::from_secs(60);
client.timeouts.delete = Duration::from_secs(60);
client.timeouts.verify_pod_condition = Duration::from_secs(60);

setup_repository_async(&client)
.await
.expect("Repository could not be setup.");

const NUM_PODS: u32 = 100;

let node = client
.list_labeled::<Node>("kubernetes.io/arch=stackable-linux")
.await
.expect("List of Stackable nodes could not be retrieved")
.into_iter()
.next()
.expect("No Stackable node found");

let node_name = node.metadata.name.clone().expect("Node has no name");

let allocatable_pods = get_allocatable_pods(&node);

// This assertion assumes that either the allocated pods are already
// subtracted from `allocatable_pods` (which is currently not the
// case) or that no other pods are started while testing.
assert!(
NUM_PODS <= allocatable_pods,
"The test case tries to create {num} pods but only {max} pods \
are allocatable on the node {node_name}.",
num = NUM_PODS,
max = allocatable_pods,
node_name = node_name
);

let pod_spec = format!(
"
apiVersion: v1
kind: Pod
metadata:
name: agent-service-integration-test-race-condition
spec:
containers:
- name: noop-service
image: noop-service:1.0.0
command:
- noop-service-1.0.0/start.sh
tolerations:
- key: kubernetes.io/arch
operator: Equal
value: stackable-linux
nodeName: {node_name}
",
node_name = node_name
);

let pod_specs = (0..NUM_PODS)
.map(|_| with_unique_name(&pod_spec))
.collect::<Vec<_>>();

let (pods, creation_errors) =
partition_results(join_all(pod_specs.iter().map(|spec| client.create::<Pod>(spec))).await);
let pods_created = pods.len();

let (ready_successes, ready_errors) = partition_results(
join_all(
pods.iter()
.map(|pod| client.verify_pod_condition(pod, "Ready")),
)
.await,
);
let pods_ready = ready_successes.len();

let (deletion_successes, deletion_errors) =
partition_results(join_all(pods.into_iter().map(|pod| client.delete(pod))).await);
let pods_deleted = deletion_successes.len();

let mut errors = Vec::new();
errors.extend(creation_errors);
errors.extend(ready_errors);
errors.extend(deletion_errors);

if let Some(error) = errors.first() {
panic!(
"Pods: {created}/{total} created, {ready}/{created} ready, {deleted}/{created} deleted; Error: {error}",
total = NUM_PODS,
created = pods_created,
ready = pods_ready,
deleted = pods_deleted,
error = error
);
}
}

fn partition_results<T, E>(results: Vec<Result<T, E>>) -> (Vec<T>, Vec<E>)
where
E: fmt::Debug,
T: fmt::Debug,
{
let (successes, errors) = results.into_iter().partition::<Vec<_>, _>(Result::is_ok);
let unwrapped_successes = successes.into_iter().map(Result::unwrap).collect();
let unwrapped_errors = errors.into_iter().map(Result::unwrap_err).collect();

(unwrapped_successes, unwrapped_errors)
}
20 changes: 16 additions & 4 deletions tests/test/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,6 @@ impl KubeClient {
.any(|condition| condition.type_ == condition_type && condition.status == "True")
};

if is_condition_true(&pod) {
return Ok(());
}

let timeout_secs = self.timeouts.verify_pod_condition.as_secs() as u32;
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);

Expand All @@ -345,6 +341,12 @@ impl KubeClient {
.timeout(timeout_secs);
let mut stream = pods.watch(&lp, "0").await?.boxed();

let pod = pods.get_status(&pod.name()).await?;

if is_condition_true(&pod) {
return Ok(());
}

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(pod) = status {
if is_condition_true(&pod) {
Expand Down Expand Up @@ -442,3 +444,13 @@ pub fn get_node_taints(node: &Node) -> Vec<Taint> {
.and_then(|spec| spec.taints.clone())
.unwrap_or_else(Vec::new)
}

/// Returns the number of allocatable pods of the given node.
pub fn get_allocatable_pods(node: &Node) -> u32 {
node.status
.as_ref()
.and_then(|status| status.allocatable.as_ref())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not anything wrong with this test, just something I noticed when playing around:
This is not updated by the Krustlet to reflect pods already assigned to this node. In my case I had 1 pod running on a worker , max pods was set to 110 and I created 110 additional pods in the test (limited to this single node) which resulted in one "pending" pod, as the check was satisfied: 110 <= 110 but didn't account for the one additional pod.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was aware of this behavior but too lazy to write a comment. I will do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added. Thanks for the thorough review.

.and_then(|allocatable| allocatable.get("pods"))
.and_then(|allocatable_pods| allocatable_pods.0.parse().ok())
.unwrap_or_default()
}
2 changes: 1 addition & 1 deletion tests/test/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use super::assertions::*;
pub use super::kube::*;
pub use super::repository::setup_repository;
pub use super::repository::*;
pub use super::temporary_resource::TemporaryResource;

pub use indoc::{formatdoc, indoc};
Expand Down
33 changes: 20 additions & 13 deletions tests/test/repository.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
use super::prelude::TestKubeClient;
use indoc::indoc;
use super::prelude::{KubeClient, TestKubeClient};
use anyhow::Result;
use kube_derive::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

const REPO_SPEC: &str = "
apiVersion: stable.stackable.de/v1
kind: Repository
metadata:
name: integration-test-repository
namespace: default
spec:
repo_type: StackableRepo
properties:
url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/
";

/// Specification of a Stackable repository
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
Expand All @@ -25,16 +37,11 @@ pub enum RepoType {

pub fn setup_repository(client: &TestKubeClient) {
client.apply_crd(&Repository::crd());
client.apply::<Repository>(REPO_SPEC);
}

client.apply::<Repository>(indoc! {"
apiVersion: stable.stackable.de/v1
kind: Repository
metadata:
name: integration-test-repository
namespace: default
spec:
repo_type: StackableRepo
properties:
url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/
"});
pub async fn setup_repository_async(client: &KubeClient) -> Result<()> {
client.apply_crd(&Repository::crd()).await?;
client.apply::<Repository>(REPO_SPEC).await?;
Ok(())
}