diff --git a/Cargo.lock b/Cargo.lock index b93e6fd..8ec51a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,7 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 - [[package]] name = "Inflector" version = "0.11.4" @@ -1394,6 +1392,7 @@ dependencies = [ "once_cell", "pin-project-lite", "signal-hook-registry", + "tokio-macros", "winapi", ] @@ -1407,6 +1406,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-macros" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 017a290..56b5cc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,5 +19,5 @@ serde = "1" serde_json = "1" serde_yaml = "0.8" spectral = "0.6" -tokio = { version = "1", features = ["rt-multi-thread"] } +tokio = { version = "1.6", features = ["macros", "rt-multi-thread"] } uuid = { version = "0.8", features = ["v4"] } diff --git a/tests/service.rs b/tests/service.rs index 7594ebf..6689c15 100644 --- a/tests/service.rs +++ b/tests/service.rs @@ -1,5 +1,6 @@ mod test; -use std::time::Duration; +use futures::future::join_all; +use std::{fmt, time::Duration}; use test::prelude::*; #[test] @@ -66,3 +67,113 @@ fn restart_after_ungraceful_shutdown_should_succeed() { client.verify_pod_condition(&pod, "Ready"); } } + +#[tokio::test(flavor = "multi_thread")] +async fn starting_and_stopping_100_pods_simultaneously_should_succeed() { + let mut client = KubeClient::new() + .await + .expect("Kubernetes client could not be created"); + client.timeouts.create = Duration::from_secs(60); + client.timeouts.delete = Duration::from_secs(60); + client.timeouts.verify_pod_condition = Duration::from_secs(60); + + setup_repository_async(&client) + .await + .expect("Repository could not be setup."); + + const NUM_PODS: u32 = 100; + + let node = client + .list_labeled::("kubernetes.io/arch=stackable-linux") + .await + .expect("List of Stackable nodes could not be retrieved") + .into_iter() + .next() + .expect("No Stackable node found"); + + let node_name = node.metadata.name.clone().expect("Node has no name"); + + let allocatable_pods = get_allocatable_pods(&node); + + // This assertion assumes that either the allocated pods are already + // subtracted from `allocatable_pods` (which is currently not the + // case) or that no other pods are started while testing. + assert!( + NUM_PODS <= allocatable_pods, + "The test case tries to create {num} pods but only {max} pods \ + are allocatable on the node {node_name}.", + num = NUM_PODS, + max = allocatable_pods, + node_name = node_name + ); + + let pod_spec = format!( + " + apiVersion: v1 + kind: Pod + metadata: + name: agent-service-integration-test-race-condition + spec: + containers: + - name: noop-service + image: noop-service:1.0.0 + command: + - noop-service-1.0.0/start.sh + tolerations: + - key: kubernetes.io/arch + operator: Equal + value: stackable-linux + nodeName: {node_name} + ", + node_name = node_name + ); + + let pod_specs = (0..NUM_PODS) + .map(|_| with_unique_name(&pod_spec)) + .collect::>(); + + let (pods, creation_errors) = + partition_results(join_all(pod_specs.iter().map(|spec| client.create::(spec))).await); + let pods_created = pods.len(); + + let (ready_successes, ready_errors) = partition_results( + join_all( + pods.iter() + .map(|pod| client.verify_pod_condition(pod, "Ready")), + ) + .await, + ); + let pods_ready = ready_successes.len(); + + let (deletion_successes, deletion_errors) = + partition_results(join_all(pods.into_iter().map(|pod| client.delete(pod))).await); + let pods_deleted = deletion_successes.len(); + + let mut errors = Vec::new(); + errors.extend(creation_errors); + errors.extend(ready_errors); + errors.extend(deletion_errors); + + if let Some(error) = errors.first() { + panic!( + "Pods: {created}/{total} created, {ready}/{created} ready, {deleted}/{created} deleted; Error: {error}", + total = NUM_PODS, + created = pods_created, + ready = pods_ready, + deleted = pods_deleted, + error = error + ); + } +} + +fn partition_results(results: Vec>) -> (Vec, Vec) +where + E: fmt::Debug, + T: fmt::Debug, +{ + let (successes, errors) = results.into_iter().partition::, _>(Result::is_ok); + let unwrapped_successes = successes.into_iter().map(Result::unwrap).collect(); + let unwrapped_errors = errors.into_iter().map(Result::unwrap_err).collect(); + + (unwrapped_successes, unwrapped_errors) +} diff --git a/tests/test/kube.rs b/tests/test/kube.rs index 9f9e6ca..2260b40 100644 --- a/tests/test/kube.rs +++ b/tests/test/kube.rs @@ -333,10 +333,6 @@ impl KubeClient { .any(|condition| condition.type_ == condition_type && condition.status == "True") }; - if is_condition_true(&pod) { - return Ok(()); - } - let timeout_secs = self.timeouts.verify_pod_condition.as_secs() as u32; let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); @@ -345,6 +341,12 @@ impl KubeClient { .timeout(timeout_secs); let mut stream = pods.watch(&lp, "0").await?.boxed(); + let pod = pods.get_status(&pod.name()).await?; + + if is_condition_true(&pod) { + return Ok(()); + } + while let Some(status) = stream.try_next().await? { if let WatchEvent::Modified(pod) = status { if is_condition_true(&pod) { @@ -442,3 +444,13 @@ pub fn get_node_taints(node: &Node) -> Vec { .and_then(|spec| spec.taints.clone()) .unwrap_or_else(Vec::new) } + +/// Returns the number of allocatable pods of the given node. +pub fn get_allocatable_pods(node: &Node) -> u32 { + node.status + .as_ref() + .and_then(|status| status.allocatable.as_ref()) + .and_then(|allocatable| allocatable.get("pods")) + .and_then(|allocatable_pods| allocatable_pods.0.parse().ok()) + .unwrap_or_default() +} diff --git a/tests/test/prelude.rs b/tests/test/prelude.rs index 2fc0ae8..401665b 100644 --- a/tests/test/prelude.rs +++ b/tests/test/prelude.rs @@ -1,6 +1,6 @@ pub use super::assertions::*; pub use super::kube::*; -pub use super::repository::setup_repository; +pub use super::repository::*; pub use super::temporary_resource::TemporaryResource; pub use indoc::{formatdoc, indoc}; diff --git a/tests/test/repository.rs b/tests/test/repository.rs index 009b7ef..ccfd9fe 100644 --- a/tests/test/repository.rs +++ b/tests/test/repository.rs @@ -1,10 +1,22 @@ -use super::prelude::TestKubeClient; -use indoc::indoc; +use super::prelude::{KubeClient, TestKubeClient}; +use anyhow::Result; use kube_derive::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +const REPO_SPEC: &str = " + apiVersion: stable.stackable.de/v1 + kind: Repository + metadata: + name: integration-test-repository + namespace: default + spec: + repo_type: StackableRepo + properties: + url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/ +"; + /// Specification of a Stackable repository #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] #[kube( @@ -25,16 +37,11 @@ pub enum RepoType { pub fn setup_repository(client: &TestKubeClient) { client.apply_crd(&Repository::crd()); + client.apply::(REPO_SPEC); +} - client.apply::(indoc! {" - apiVersion: stable.stackable.de/v1 - kind: Repository - metadata: - name: integration-test-repository - namespace: default - spec: - repo_type: StackableRepo - properties: - url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/ - "}); +pub async fn setup_repository_async(client: &KubeClient) -> Result<()> { + client.apply_crd(&Repository::crd()).await?; + client.apply::(REPO_SPEC).await?; + Ok(()) }