From 89d981d2d99575aaa81d9e30856d36fb81bf79ee Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 8 Feb 2022 13:40:50 +0800 Subject: [PATCH 01/11] Enable periodic cleanup of work_dir directories in ballista executor --- ballista/rust/executor/Cargo.toml | 1 + .../rust/executor/executor_config_spec.toml | 18 ++++ ballista/rust/executor/src/main.rs | 84 ++++++++++++++++++- 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 1f7ac61a2071c..3ef470794eab4 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -47,6 +47,7 @@ tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } hyper = "0.14.4" parking_lot = "0.12" +chrono = { version = "0.4", default-features = false } [dev-dependencies] diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index 1dd3de99012c1..34757a8100c95 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -78,3 +78,21 @@ name = "task_scheduling_policy" type = "ballista_core::config::TaskSchedulingPolicy" doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged" default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" + +[[param]] +name = "executor_cleanup_enable" +type = "bool" +doc = "Enable periodic cleanup of work_dir directories." +default = "true" + +[[param]] +name = "executor_cleanup_interval" +type = "u64" +doc = "Controls the interval in seconds , which the worker cleans up old job dirs on the local machine." +default = "1800" + +[[param]] +name = "executor_cleanup_ttl" +type = "i64" +doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600)" +default = "604800" \ No newline at end of file diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 2321ce338eb58..e6c1596d80daf 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -17,13 +17,17 @@ //! Ballista Rust executor binary. +use chrono::{DateTime, Duration, Utc}; use std::sync::Arc; +use std::time::Duration as Core_Duration; use anyhow::{Context, Result}; use arrow_flight::flight_service_server::FlightServiceServer; use ballista_executor::{execution_loop, executor_server}; -use log::info; +use log::{error, info}; use tempfile::TempDir; +use tokio::fs::ReadDir; +use tokio::{fs, time}; use tonic::transport::Server; use uuid::Uuid; @@ -112,6 +116,21 @@ async fn main() -> Result<()> { .context("Could not connect to scheduler")?; let scheduler_policy = opt.task_scheduling_policy; + let cleanup_ttl = opt.executor_cleanup_ttl; + + if opt.executor_cleanup_enable { + let interval = opt.executor_cleanup_interval; + let mut interval_time = time::interval(Core_Duration::from_secs(interval)); + tokio::spawn(async move { + loop { + interval_time.tick().await; + clean_shuffle_data_loop(&work_dir, cleanup_ttl) + .await + .unwrap(); + } + }); + } + match scheduler_policy { TaskSchedulingPolicy::PushStaged => { tokio::spawn(executor_server::startup( @@ -148,3 +167,66 @@ async fn main() -> Result<()> { Ok(()) } + +/// This function will scheduled periodically for cleanup executor +async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { + let mut dir = fs::read_dir(work_dir).await?; + let mut to_deleted = Vec::new(); + let mut need_delete_dir; + + while let Some(child) = dir.next_entry().await? { + let metadata = child.metadata().await?; + if metadata.is_dir() { + let dir = fs::read_dir(child.path()).await?; + match check_modified_time_in_dirs(vec![dir], seconds).await { + Ok(x) => match x { + true => { + need_delete_dir = child.path().into_os_string(); + to_deleted.push(need_delete_dir) + } + false => {} + }, + Err(e) => { + error!("Fail in clean_shuffle_data_loop {:?}", e) + } + } + } + } + info!( + "Executor work_dir {:?} not modified in {:?} seconds will be deleted ", + &to_deleted, seconds + ); + for del in to_deleted { + fs::remove_dir_all(del).await?; + } + Ok(()) +} + +/// Determines if a directory all files are older than cutoff seconds. +async fn check_modified_time_in_dirs( + mut vec: Vec, + seconds: i64, +) -> Result { + let cutoff = Utc::now() - Duration::seconds(seconds); + + while !vec.is_empty() { + let mut dir = vec.pop().unwrap(); + while let Some(child) = dir.next_entry().await? { + let meta = child.metadata().await?; + match meta.is_dir() { + true => { + let dir = fs::read_dir(child.path()).await?; + vec.push(dir); + } + false => { + let modified_time: DateTime = + meta.modified().map(chrono::DateTime::from).unwrap(); + if modified_time > cutoff { + return Ok(false); + } + } + } + } + } + Ok(true) +} From 574217623b726e754434bc0cedd2a7f2dcf6be26 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 8 Feb 2022 19:14:43 +0800 Subject: [PATCH 02/11] add ut. --- .../rust/executor/executor_config_spec.toml | 2 +- ballista/rust/executor/src/main.rs | 72 +++++++++++++++---- 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index 34757a8100c95..d1416ba4c60b8 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -95,4 +95,4 @@ default = "1800" name = "executor_cleanup_ttl" type = "i64" doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600)" -default = "604800" \ No newline at end of file +default = "604800" diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index e6c1596d80daf..5106e818b90e2 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -168,28 +168,31 @@ async fn main() -> Result<()> { Ok(()) } -/// This function will scheduled periodically for cleanup executor +/// This function will scheduled periodically for cleanup executor. +/// Will only clean the dir under work_dir not include file async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { let mut dir = fs::read_dir(work_dir).await?; let mut to_deleted = Vec::new(); let mut need_delete_dir; - - while let Some(child) = dir.next_entry().await? { - let metadata = child.metadata().await?; - if metadata.is_dir() { - let dir = fs::read_dir(child.path()).await?; - match check_modified_time_in_dirs(vec![dir], seconds).await { - Ok(x) => match x { - true => { - need_delete_dir = child.path().into_os_string(); - to_deleted.push(need_delete_dir) + while let Some(child) = dir.next_entry().await.unwrap() { + if let Ok(metadata) = child.metadata().await { + if metadata.is_dir() { + let dir = fs::read_dir(child.path()).await?; + match check_modified_time_in_dirs(vec![dir], seconds).await { + Ok(x) => match x { + true => { + need_delete_dir = child.path().into_os_string(); + to_deleted.push(need_delete_dir) + } + false => {} + }, + Err(e) => { + error!("Fail in clean_shuffle_data_loop {:?}", e) } - false => {} - }, - Err(e) => { - error!("Fail in clean_shuffle_data_loop {:?}", e) } } + } else { + error!("can not get meta from file{:?}", child) } } info!( @@ -230,3 +233,42 @@ async fn check_modified_time_in_dirs( } Ok(true) } + +#[cfg(test)] +mod tests { + use crate::clean_shuffle_data_loop; + use std::fs; + use std::fs::File; + use std::io::Write; + use std::time::Duration; + use tempfile::TempDir; + + #[tokio::test] + async fn test_executor_clean_up() { + let work_dir = TempDir::new().unwrap().into_path(); + let job_dir = work_dir.as_path().join("job_id"); + let file_path = job_dir.as_path().join("tmp.csv"); + let data = "Jorge,2018-12-13T12:12:10.011Z\n\ + Andrew,2018-11-13T17:11:10.011Z"; + fs::create_dir(job_dir).unwrap(); + File::create(&file_path) + .expect("creating temp file") + .write_all(data.as_bytes()) + .expect("writing data"); + + let work_dir_clone = work_dir.clone(); + + let count1 = fs::read_dir(work_dir.clone()).unwrap().count(); + assert_eq!(count1, 1); + let mut handles = vec![]; + handles.push(tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(2)).await; + clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1) + .await + .unwrap(); + })); + futures::future::join_all(handles).await; + let count2 = fs::read_dir(work_dir.clone()).unwrap().count(); + assert_eq!(count2, 0); + } +} From f3718a44ad9163c6bcd55b17e20f9b79c19effb6 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 9 Feb 2022 14:24:34 +0800 Subject: [PATCH 03/11] fix error handle. --- ballista/rust/executor/src/main.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 5106e818b90e2..d9eef3b53bdd2 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -124,9 +124,12 @@ async fn main() -> Result<()> { tokio::spawn(async move { loop { interval_time.tick().await; - clean_shuffle_data_loop(&work_dir, cleanup_ttl) - .await - .unwrap(); + match clean_shuffle_data_loop(&work_dir, cleanup_ttl).await { + Err(e) => { + error!("Ballista executor fail to clean_shuffle_data {:?}", e) + } + _ => {} + } } }); } @@ -174,7 +177,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { let mut dir = fs::read_dir(work_dir).await?; let mut to_deleted = Vec::new(); let mut need_delete_dir; - while let Some(child) = dir.next_entry().await.unwrap() { + while let Some(child) = dir.next_entry().await? { if let Ok(metadata) = child.metadata().await { if metadata.is_dir() { let dir = fs::read_dir(child.path()).await?; From 4ac5f4786fd63e4e4dd31cbcb158688f87ddc738 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 9 Feb 2022 14:32:38 +0800 Subject: [PATCH 04/11] fix clippy --- ballista/rust/executor/src/main.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index d9eef3b53bdd2..899f8de3ae3ac 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -124,11 +124,8 @@ async fn main() -> Result<()> { tokio::spawn(async move { loop { interval_time.tick().await; - match clean_shuffle_data_loop(&work_dir, cleanup_ttl).await { - Err(e) => { - error!("Ballista executor fail to clean_shuffle_data {:?}", e) - } - _ => {} + if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl).await { + error!("Ballista executor fail to clean_shuffle_data {:?}", e) } } }); @@ -226,7 +223,7 @@ async fn check_modified_time_in_dirs( } false => { let modified_time: DateTime = - meta.modified().map(chrono::DateTime::from).unwrap(); + meta.modified().map(chrono::DateTime::from)?; if modified_time > cutoff { return Ok(false); } From e94f98a1b05fbae78a3065d990a058447ffb0048 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 9 Feb 2022 14:39:10 +0800 Subject: [PATCH 05/11] fix doc --- ballista/rust/executor/executor_config_spec.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index d1416ba4c60b8..a4482264ce633 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -94,5 +94,5 @@ default = "1800" [[param]] name = "executor_cleanup_ttl" type = "i64" -doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600)" +doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained" default = "604800" From 7b151b9f31ad34f433533511527e599e9a4c1e88 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 15 Feb 2022 11:46:54 +0800 Subject: [PATCH 06/11] add annotation --- ballista/rust/executor/src/main.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 899f8de3ae3ac..386e9d2bbac6d 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -176,6 +176,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { let mut need_delete_dir; while let Some(child) = dir.next_entry().await? { if let Ok(metadata) = child.metadata().await { + // only delete the job dir if metadata.is_dir() { let dir = fs::read_dir(child.path()).await?; match check_modified_time_in_dirs(vec![dir], seconds).await { @@ -208,9 +209,9 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { /// Determines if a directory all files are older than cutoff seconds. async fn check_modified_time_in_dirs( mut vec: Vec, - seconds: i64, + ttl_seconds: i64, ) -> Result { - let cutoff = Utc::now() - Duration::seconds(seconds); + let cutoff = Utc::now() - Duration::seconds(ttl_seconds); while !vec.is_empty() { let mut dir = vec.pop().unwrap(); @@ -219,12 +220,14 @@ async fn check_modified_time_in_dirs( match meta.is_dir() { true => { let dir = fs::read_dir(child.path()).await?; + // check in next loop vec.push(dir); } false => { let modified_time: DateTime = meta.modified().map(chrono::DateTime::from)?; if modified_time > cutoff { + // if one file has been modified in ttl we won't delete the whole dir return Ok(false); } } From 464efe0f57bb8894a9ad871d584725680de1fc1c Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 16 Feb 2022 15:16:04 +0800 Subject: [PATCH 07/11] fix clippy --- ballista/rust/executor/src/main.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 386e9d2bbac6d..1b1a8ff531156 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -217,19 +217,16 @@ async fn check_modified_time_in_dirs( let mut dir = vec.pop().unwrap(); while let Some(child) = dir.next_entry().await? { let meta = child.metadata().await?; - match meta.is_dir() { - true => { - let dir = fs::read_dir(child.path()).await?; - // check in next loop - vec.push(dir); - } - false => { - let modified_time: DateTime = - meta.modified().map(chrono::DateTime::from)?; - if modified_time > cutoff { - // if one file has been modified in ttl we won't delete the whole dir - return Ok(false); - } + if meta.is_dir() { + let dir = fs::read_dir(child.path()).await?; + // check in next loop + vec.push(dir); + } else { + let modified_time: DateTime = + meta.modified().map(chrono::DateTime::from)?; + if modified_time > cutoff { + // if one file has been modified in ttl we won't delete the whole dir + return Ok(false); } } } From 7aac84ec16ee8ccb5fbeffb000578538e8bf478a Mon Sep 17 00:00:00 2001 From: Yang Jiang <37145547+Ted-Jiang@users.noreply.github.com> Date: Mon, 7 Mar 2022 15:19:58 +0800 Subject: [PATCH 08/11] Update ballista/rust/executor/src/main.rs fix info Co-authored-by: Kun Liu --- ballista/rust/executor/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 1b1a8ff531156..40013b045da06 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -197,7 +197,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { } } info!( - "Executor work_dir {:?} not modified in {:?} seconds will be deleted ", + "The work_dir {:?} that have not been modified for {:?} seconds will be deleted", &to_deleted, seconds ); for del in to_deleted { From b74770e562ebf385fa0d514360c53c91495087ff Mon Sep 17 00:00:00 2001 From: Yang Jiang <37145547+Ted-Jiang@users.noreply.github.com> Date: Mon, 7 Mar 2022 15:20:11 +0800 Subject: [PATCH 09/11] Update ballista/rust/executor/src/main.rs fix info Co-authored-by: Kun Liu --- ballista/rust/executor/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 40013b045da06..26486e9b2d642 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -193,7 +193,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { } } } else { - error!("can not get meta from file{:?}", child) + error!("Can not get metadata from file: {:?}", child) } } info!( From ab6c41a4f496e4950c64ab1534cead643da10a10 Mon Sep 17 00:00:00 2001 From: Yang Jiang <37145547+Ted-Jiang@users.noreply.github.com> Date: Mon, 7 Mar 2022 15:28:24 +0800 Subject: [PATCH 10/11] Update ballista/rust/executor/src/main.rs fix Co-authored-by: Kun Liu --- ballista/rust/executor/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 26486e9b2d642..b7cd6270b6182 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -119,8 +119,7 @@ async fn main() -> Result<()> { let cleanup_ttl = opt.executor_cleanup_ttl; if opt.executor_cleanup_enable { - let interval = opt.executor_cleanup_interval; - let mut interval_time = time::interval(Core_Duration::from_secs(interval)); + let mut interval_time = time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval)); tokio::spawn(async move { loop { interval_time.tick().await; From 311f8ce4a2878b520314acee8b98073624044c57 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 7 Mar 2022 15:41:33 +0800 Subject: [PATCH 11/11] change config --- ballista/rust/executor/executor_config_spec.toml | 4 ++-- ballista/rust/executor/src/main.rs | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index a4482264ce633..167ec20d2e4af 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -83,7 +83,7 @@ default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" name = "executor_cleanup_enable" type = "bool" doc = "Enable periodic cleanup of work_dir directories." -default = "true" +default = "false" [[param]] name = "executor_cleanup_interval" @@ -93,6 +93,6 @@ default = "1800" [[param]] name = "executor_cleanup_ttl" -type = "i64" +type = "u64" doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained" default = "604800" diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index b7cd6270b6182..b29ef40a0e425 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -119,11 +119,14 @@ async fn main() -> Result<()> { let cleanup_ttl = opt.executor_cleanup_ttl; if opt.executor_cleanup_enable { - let mut interval_time = time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval)); + let mut interval_time = + time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval)); tokio::spawn(async move { loop { interval_time.tick().await; - if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl).await { + if let Err(e) = + clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await + { error!("Ballista executor fail to clean_shuffle_data {:?}", e) } }