Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.12"
chrono = { version = "0.4", default-features = false }

[dev-dependencies]

Expand Down
18 changes: 18 additions & 0 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,21 @@ name = "task_scheduling_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged"
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"

[[param]]
name = "executor_cleanup_enable"
type = "bool"
doc = "Enable periodic cleanup of work_dir directories."
default = "false"

[[param]]
name = "executor_cleanup_interval"
type = "u64"
doc = "Controls the interval in seconds , which the worker cleans up old job dirs on the local machine."
default = "1800"

[[param]]
name = "executor_cleanup_ttl"
type = "u64"
doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained"
default = "604800"
128 changes: 127 additions & 1 deletion ballista/rust/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

//! Ballista Rust executor binary.

use chrono::{DateTime, Duration, Utc};
use std::sync::Arc;
use std::time::Duration as Core_Duration;

use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_executor::{execution_loop, executor_server};
use log::info;
use log::{error, info};
use tempfile::TempDir;
use tokio::fs::ReadDir;
use tokio::{fs, time};
use tonic::transport::Server;
use uuid::Uuid;

Expand Down Expand Up @@ -112,6 +116,23 @@ async fn main() -> Result<()> {
.context("Could not connect to scheduler")?;

let scheduler_policy = opt.task_scheduling_policy;
let cleanup_ttl = opt.executor_cleanup_ttl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be possible to use NamedTempFile or some other struct from tempfile (already a dependency): https://crates.io/crates/tempfile

There are at least two benefits:

  1. The files are dropped immediately after they are no longer used so required intermediate diskspace is minimized
  2. They can't be accidentally dropped while still in use (which perhaps affects long running queries)

I did something similar in DataFusion here: #1680

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Thanks for your advice 😊!
IMHP, if one job has 3 stage, stage2 read stage1 input then delete the file, but stage2 task fail,
In ballista, scheduler will start a task to reload stage1 input. I think using NamedTempFile will cause some trouble and complexity.
we need keep the file for task-recovery and stage retry (like spark). So i decide if all the files under job_dir not modified in TTL we can safely delete it.
If i am not right, Please correct me 🙈

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't honestly know enough about Ballista and its executor to know.

What do you think @yahoNanJing and @liukun4515 ?


if opt.executor_cleanup_enable {
let mut interval_time =
time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
tokio::spawn(async move {
loop {
interval_time.tick().await;
if let Err(e) =
clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
{
error!("Ballista executor fail to clean_shuffle_data {:?}", e)
}
}
});
}

match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
tokio::spawn(executor_server::startup(
Expand Down Expand Up @@ -148,3 +169,108 @@ async fn main() -> Result<()> {

Ok(())
}

/// This function will scheduled periodically for cleanup executor.
/// Will only clean the dir under work_dir not include file
async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
let mut need_delete_dir;
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
// only delete the job dir
if metadata.is_dir() {
let dir = fs::read_dir(child.path()).await?;
match check_modified_time_in_dirs(vec![dir], seconds).await {
Ok(x) => match x {
true => {
need_delete_dir = child.path().into_os_string();
to_deleted.push(need_delete_dir)
}
false => {}
},
Err(e) => {
error!("Fail in clean_shuffle_data_loop {:?}", e)
}
}
}
} else {
error!("Can not get metadata from file: {:?}", child)
}
}
info!(
"The work_dir {:?} that have not been modified for {:?} seconds will be deleted",
&to_deleted, seconds
);
for del in to_deleted {
fs::remove_dir_all(del).await?;
}
Ok(())
}

/// Determines if a directory all files are older than cutoff seconds.
async fn check_modified_time_in_dirs(
mut vec: Vec<ReadDir>,
ttl_seconds: i64,
) -> Result<bool> {
let cutoff = Utc::now() - Duration::seconds(ttl_seconds);

while !vec.is_empty() {
let mut dir = vec.pop().unwrap();
while let Some(child) = dir.next_entry().await? {
let meta = child.metadata().await?;
if meta.is_dir() {
let dir = fs::read_dir(child.path()).await?;
// check in next loop
vec.push(dir);
} else {
let modified_time: DateTime<Utc> =
meta.modified().map(chrono::DateTime::from)?;
if modified_time > cutoff {
// if one file has been modified in ttl we won't delete the whole dir
return Ok(false);
}
}
}
}
Ok(true)
}

#[cfg(test)]
mod tests {
use crate::clean_shuffle_data_loop;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile::TempDir;

#[tokio::test]
async fn test_executor_clean_up() {
let work_dir = TempDir::new().unwrap().into_path();
let job_dir = work_dir.as_path().join("job_id");
let file_path = job_dir.as_path().join("tmp.csv");
let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";
fs::create_dir(job_dir).unwrap();
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");

let work_dir_clone = work_dir.clone();

let count1 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count1, 1);
let mut handles = vec![];
handles.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1)
.await
.unwrap();
}));
futures::future::join_all(handles).await;
let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}
}