diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 1f7ac61a2071c..3ef470794eab4 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -47,6 +47,7 @@ tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } hyper = "0.14.4" parking_lot = "0.12" +chrono = { version = "0.4", default-features = false } [dev-dependencies] diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index 1dd3de99012c1..167ec20d2e4af 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -78,3 +78,21 @@ name = "task_scheduling_policy" type = "ballista_core::config::TaskSchedulingPolicy" doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged" default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" + +[[param]] +name = "executor_cleanup_enable" +type = "bool" +doc = "Enable periodic cleanup of work_dir directories." +default = "false" + +[[param]] +name = "executor_cleanup_interval" +type = "u64" +doc = "Controls the interval in seconds , which the worker cleans up old job dirs on the local machine." +default = "1800" + +[[param]] +name = "executor_cleanup_ttl" +type = "u64" +doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained" +default = "604800" diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 2321ce338eb58..b29ef40a0e425 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -17,13 +17,17 @@ //! Ballista Rust executor binary. +use chrono::{DateTime, Duration, Utc}; use std::sync::Arc; +use std::time::Duration as Core_Duration; use anyhow::{Context, Result}; use arrow_flight::flight_service_server::FlightServiceServer; use ballista_executor::{execution_loop, executor_server}; -use log::info; +use log::{error, info}; use tempfile::TempDir; +use tokio::fs::ReadDir; +use tokio::{fs, time}; use tonic::transport::Server; use uuid::Uuid; @@ -112,6 +116,23 @@ async fn main() -> Result<()> { .context("Could not connect to scheduler")?; let scheduler_policy = opt.task_scheduling_policy; + let cleanup_ttl = opt.executor_cleanup_ttl; + + if opt.executor_cleanup_enable { + let mut interval_time = + time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval)); + tokio::spawn(async move { + loop { + interval_time.tick().await; + if let Err(e) = + clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await + { + error!("Ballista executor fail to clean_shuffle_data {:?}", e) + } + } + }); + } + match scheduler_policy { TaskSchedulingPolicy::PushStaged => { tokio::spawn(executor_server::startup( @@ -148,3 +169,108 @@ async fn main() -> Result<()> { Ok(()) } + +/// This function will scheduled periodically for cleanup executor. +/// Will only clean the dir under work_dir not include file +async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> { + let mut dir = fs::read_dir(work_dir).await?; + let mut to_deleted = Vec::new(); + let mut need_delete_dir; + while let Some(child) = dir.next_entry().await? { + if let Ok(metadata) = child.metadata().await { + // only delete the job dir + if metadata.is_dir() { + let dir = fs::read_dir(child.path()).await?; + match check_modified_time_in_dirs(vec![dir], seconds).await { + Ok(x) => match x { + true => { + need_delete_dir = child.path().into_os_string(); + to_deleted.push(need_delete_dir) + } + false => {} + }, + Err(e) => { + error!("Fail in clean_shuffle_data_loop {:?}", e) + } + } + } + } else { + error!("Can not get metadata from file: {:?}", child) + } + } + info!( + "The work_dir {:?} that have not been modified for {:?} seconds will be deleted", + &to_deleted, seconds + ); + for del in to_deleted { + fs::remove_dir_all(del).await?; + } + Ok(()) +} + +/// Determines if a directory all files are older than cutoff seconds. +async fn check_modified_time_in_dirs( + mut vec: Vec, + ttl_seconds: i64, +) -> Result { + let cutoff = Utc::now() - Duration::seconds(ttl_seconds); + + while !vec.is_empty() { + let mut dir = vec.pop().unwrap(); + while let Some(child) = dir.next_entry().await? { + let meta = child.metadata().await?; + if meta.is_dir() { + let dir = fs::read_dir(child.path()).await?; + // check in next loop + vec.push(dir); + } else { + let modified_time: DateTime = + meta.modified().map(chrono::DateTime::from)?; + if modified_time > cutoff { + // if one file has been modified in ttl we won't delete the whole dir + return Ok(false); + } + } + } + } + Ok(true) +} + +#[cfg(test)] +mod tests { + use crate::clean_shuffle_data_loop; + use std::fs; + use std::fs::File; + use std::io::Write; + use std::time::Duration; + use tempfile::TempDir; + + #[tokio::test] + async fn test_executor_clean_up() { + let work_dir = TempDir::new().unwrap().into_path(); + let job_dir = work_dir.as_path().join("job_id"); + let file_path = job_dir.as_path().join("tmp.csv"); + let data = "Jorge,2018-12-13T12:12:10.011Z\n\ + Andrew,2018-11-13T17:11:10.011Z"; + fs::create_dir(job_dir).unwrap(); + File::create(&file_path) + .expect("creating temp file") + .write_all(data.as_bytes()) + .expect("writing data"); + + let work_dir_clone = work_dir.clone(); + + let count1 = fs::read_dir(work_dir.clone()).unwrap().count(); + assert_eq!(count1, 1); + let mut handles = vec![]; + handles.push(tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(2)).await; + clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1) + .await + .unwrap(); + })); + futures::future::join_all(handles).await; + let count2 = fs::read_dir(work_dir.clone()).unwrap().count(); + assert_eq!(count2, 0); + } +}