From a19068f89118f1521dcb9e2650ec56ee4e17b15b Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Fri, 21 May 2021 15:23:33 +0200 Subject: [PATCH 1/3] Test no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_parallel added --- tests/service.rs | 39 +++++++++++++++++++++++++++++++++++++++ tests/test/kube.rs | 10 ++++++---- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/tests/service.rs b/tests/service.rs index 7594ebf..e7f42e8 100644 --- a/tests/service.rs +++ b/tests/service.rs @@ -66,3 +66,42 @@ fn restart_after_ungraceful_shutdown_should_succeed() { client.verify_pod_condition(&pod, "Ready"); } } + +// This test provokes race conditions but does not guarantee their +// absence on success. +#[test] +fn no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_parallel() { + let mut client = TestKubeClient::new(); + client.timeouts().verify_pod_condition = Duration::from_secs(120); + + setup_repository(&client); + + let pod_spec = indoc! {" + apiVersion: v1 + kind: Pod + metadata: + name: agent-service-integration-test-race-condition + spec: + containers: + - name: noop-service + image: noop-service:1.0.0 + command: + - noop-service-1.0.0/start.sh + tolerations: + - key: kubernetes.io/arch + operator: Equal + value: stackable-linux + "}; + + let mut pods = Vec::new(); + + for _ in 1..=100 { + pods.push(TemporaryResource::new(&client, &with_unique_name(pod_spec))); + } + + for pod in &pods { + client.verify_pod_condition(&pod, "Ready"); + } + + pods.clear(); +} diff --git a/tests/test/kube.rs b/tests/test/kube.rs index 9f9e6ca..b0b8ea1 100644 --- a/tests/test/kube.rs +++ b/tests/test/kube.rs @@ -333,10 +333,6 @@ impl KubeClient { .any(|condition| condition.type_ == condition_type && condition.status == "True") }; - if is_condition_true(&pod) { - return Ok(()); - } - let timeout_secs = self.timeouts.verify_pod_condition.as_secs() as u32; let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); @@ -345,6 +341,12 @@ impl KubeClient { .timeout(timeout_secs); let mut stream = pods.watch(&lp, "0").await?.boxed(); + let pod = pods.get_status(&pod.name()).await?; + + if is_condition_true(&pod) { + return Ok(()); + } + while let Some(status) = stream.try_next().await? { if let WatchEvent::Modified(pod) = status { if is_condition_true(&pod) { From 031bd0ff8230fa18ab3a0877ebf956eee09655a5 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 25 May 2021 14:32:08 +0200 Subject: [PATCH 2/3] Check node configuration before spawning a lot of pods --- tests/service.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/service.rs b/tests/service.rs index e7f42e8..57912dc 100644 --- a/tests/service.rs +++ b/tests/service.rs @@ -76,6 +76,24 @@ fn no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_paral setup_repository(&client); + const NUM_PODS: u32 = 100; + + let max_pods = client + .list_labeled::("kubernetes.io/arch=stackable-linux") + .iter() + .filter_map(|node| node.status.as_ref()) + .filter_map(|status| status.allocatable.as_ref()) + .filter_map(|allocatable| allocatable.get("pods")) + .filter_map(|allocatable_pods| allocatable_pods.0.parse::().ok()) + .sum::(); + + assert!( + NUM_PODS <= max_pods, + "The test case tries to create {} pods but only {} pods are allocatable on the nodes.", + NUM_PODS, + max_pods + ); + let pod_spec = indoc! {" apiVersion: v1 kind: Pod @@ -95,7 +113,7 @@ fn no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_paral let mut pods = Vec::new(); - for _ in 1..=100 { + for _ in 1..=NUM_PODS { pods.push(TemporaryResource::new(&client, &with_unique_name(pod_spec))); } From 40fe31fe6409d9fc038cbc7f5288b3ed64a2e131 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 26 May 2021 19:33:50 +0200 Subject: [PATCH 3/3] Test case no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_parallel renamed to starting_and_stopping_100_pods_simultaneously_should_succeed and made asynchronous --- Cargo.lock | 12 ++++++ Cargo.toml | 2 +- tests/service.rs | 81 ++++++++++++++++++++++++++++++---------- tests/test/kube.rs | 10 +++++ tests/test/prelude.rs | 2 +- tests/test/repository.rs | 33 +++++++++------- 6 files changed, 105 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b93e6fd..b4018c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1394,6 +1394,7 @@ dependencies = [ "once_cell", "pin-project-lite", "signal-hook-registry", + "tokio-macros", "winapi", ] @@ -1407,6 +1408,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-macros" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 017a290..56b5cc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,5 +19,5 @@ serde = "1" serde_json = "1" serde_yaml = "0.8" spectral = "0.6" -tokio = { version = "1", features = ["rt-multi-thread"] } +tokio = { version = "1.6", features = ["macros", "rt-multi-thread"] } uuid = { version = "0.8", features = ["v4"] } diff --git a/tests/service.rs b/tests/service.rs index 57912dc..a25886b 100644 --- a/tests/service.rs +++ b/tests/service.rs @@ -1,5 +1,6 @@ mod test; -use std::time::Duration; +use futures::future::join_all; +use std::{fmt, time::Duration}; use test::prelude::*; #[test] @@ -67,25 +68,28 @@ fn restart_after_ungraceful_shutdown_should_succeed() { } } -// This test provokes race conditions but does not guarantee their -// absence on success. -#[test] -fn no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_parallel() { - let mut client = TestKubeClient::new(); - client.timeouts().verify_pod_condition = Duration::from_secs(120); +#[tokio::test(flavor = "multi_thread")] +async fn starting_and_stopping_100_pods_simultaneously_should_succeed() { + let mut client = KubeClient::new() + .await + .expect("Kubernetes client could not be created"); + client.timeouts.create = Duration::from_secs(60); + client.timeouts.delete = Duration::from_secs(60); + client.timeouts.verify_pod_condition = Duration::from_secs(60); - setup_repository(&client); + setup_repository_async(&client) + .await + .expect("Repository could not be setup."); const NUM_PODS: u32 = 100; let max_pods = client .list_labeled::("kubernetes.io/arch=stackable-linux") + .await + .expect("List of Stackable nodes could not be retrieved") .iter() - .filter_map(|node| node.status.as_ref()) - .filter_map(|status| status.allocatable.as_ref()) - .filter_map(|allocatable| allocatable.get("pods")) - .filter_map(|allocatable_pods| allocatable_pods.0.parse::().ok()) - .sum::(); + .map(get_allocatable_pods) + .sum(); assert!( NUM_PODS <= max_pods, @@ -111,15 +115,52 @@ fn no_race_conditions_should_occur_if_many_pods_are_started_and_stopped_in_paral value: stackable-linux "}; - let mut pods = Vec::new(); + let pod_specs = (0..NUM_PODS) + .map(|_| with_unique_name(pod_spec)) + .collect::>(); - for _ in 1..=NUM_PODS { - pods.push(TemporaryResource::new(&client, &with_unique_name(pod_spec))); - } + let (pods, creation_errors) = + partition_results(join_all(pod_specs.iter().map(|spec| client.create::(spec))).await); + let pods_created = pods.len(); - for pod in &pods { - client.verify_pod_condition(&pod, "Ready"); + let (ready_successes, ready_errors) = partition_results( + join_all( + pods.iter() + .map(|pod| client.verify_pod_condition(pod, "Ready")), + ) + .await, + ); + let pods_ready = ready_successes.len(); + + let (deletion_successes, deletion_errors) = + partition_results(join_all(pods.into_iter().map(|pod| client.delete(pod))).await); + let pods_deleted = deletion_successes.len(); + + let mut errors = Vec::new(); + errors.extend(creation_errors); + errors.extend(ready_errors); + errors.extend(deletion_errors); + + if let Some(error) = errors.first() { + panic!( + "Pods: {created}/{total} created, {ready}/{created} ready, {deleted}/{created} deleted; Error: {error}", + total = NUM_PODS, + created = pods_created, + ready = pods_ready, + deleted = pods_deleted, + error = error + ); } +} + +fn partition_results(results: Vec>) -> (Vec, Vec) +where + E: fmt::Debug, + T: fmt::Debug, +{ + let (successes, errors) = results.into_iter().partition::, _>(Result::is_ok); + let unwrapped_successes = successes.into_iter().map(Result::unwrap).collect(); + let unwrapped_errors = errors.into_iter().map(Result::unwrap_err).collect(); - pods.clear(); + (unwrapped_successes, unwrapped_errors) } diff --git a/tests/test/kube.rs b/tests/test/kube.rs index b0b8ea1..2260b40 100644 --- a/tests/test/kube.rs +++ b/tests/test/kube.rs @@ -444,3 +444,13 @@ pub fn get_node_taints(node: &Node) -> Vec { .and_then(|spec| spec.taints.clone()) .unwrap_or_else(Vec::new) } + +/// Returns the number of allocatable pods of the given node. +pub fn get_allocatable_pods(node: &Node) -> u32 { + node.status + .as_ref() + .and_then(|status| status.allocatable.as_ref()) + .and_then(|allocatable| allocatable.get("pods")) + .and_then(|allocatable_pods| allocatable_pods.0.parse().ok()) + .unwrap_or_default() +} diff --git a/tests/test/prelude.rs b/tests/test/prelude.rs index 2fc0ae8..401665b 100644 --- a/tests/test/prelude.rs +++ b/tests/test/prelude.rs @@ -1,6 +1,6 @@ pub use super::assertions::*; pub use super::kube::*; -pub use super::repository::setup_repository; +pub use super::repository::*; pub use super::temporary_resource::TemporaryResource; pub use indoc::{formatdoc, indoc}; diff --git a/tests/test/repository.rs b/tests/test/repository.rs index 009b7ef..ccfd9fe 100644 --- a/tests/test/repository.rs +++ b/tests/test/repository.rs @@ -1,10 +1,22 @@ -use super::prelude::TestKubeClient; -use indoc::indoc; +use super::prelude::{KubeClient, TestKubeClient}; +use anyhow::Result; use kube_derive::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +const REPO_SPEC: &str = " + apiVersion: stable.stackable.de/v1 + kind: Repository + metadata: + name: integration-test-repository + namespace: default + spec: + repo_type: StackableRepo + properties: + url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/ +"; + /// Specification of a Stackable repository #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] #[kube( @@ -25,16 +37,11 @@ pub enum RepoType { pub fn setup_repository(client: &TestKubeClient) { client.apply_crd(&Repository::crd()); + client.apply::(REPO_SPEC); +} - client.apply::(indoc! {" - apiVersion: stable.stackable.de/v1 - kind: Repository - metadata: - name: integration-test-repository - namespace: default - spec: - repo_type: StackableRepo - properties: - url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/ - "}); +pub async fn setup_repository_async(client: &KubeClient) -> Result<()> { + client.apply_crd(&Repository::crd()).await?; + client.apply::(REPO_SPEC).await?; + Ok(()) }