Skip to content
39 changes: 32 additions & 7 deletions src/uu/sort/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use uucore::error::{FromIo, UResult};
use crate::{
GlobalSettings, Output, SortError,
chunks::{self, Chunk, RecycledChunk},
compare_by, open,
compare_by, fd_soft_limit, open,
tmp_dir::TmpDirWrapper,
};

Expand Down Expand Up @@ -62,6 +62,28 @@ fn replace_output_file_in_input_files(
Ok(())
}

/// Determine the effective merge batch size, enforcing a minimum and respecting the
/// file-descriptor soft limit after reserving stdio/output and a safety margin.
fn effective_merge_batch_size(settings: &GlobalSettings) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs more comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

const MIN_BATCH_SIZE: usize = 2;
const RESERVED_STDIO: usize = 3;
const RESERVED_OUTPUT: usize = 1;
const SAFETY_MARGIN: usize = 1;
let mut batch_size = settings.merge_batch_size.max(MIN_BATCH_SIZE);

if let Some(limit) = fd_soft_limit() {
let reserved = RESERVED_STDIO + RESERVED_OUTPUT + SAFETY_MARGIN;
let available_inputs = limit.saturating_sub(reserved);
if available_inputs >= MIN_BATCH_SIZE {
batch_size = batch_size.min(available_inputs);
} else {
batch_size = MIN_BATCH_SIZE;
}
}

batch_size
}

/// Merge pre-sorted `Box<dyn Read>`s.
///
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
Expand Down Expand Up @@ -94,18 +116,21 @@ pub fn merge_with_file_limit<
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
if files.len() <= settings.merge_batch_size {
let batch_size = effective_merge_batch_size(settings);
debug_assert!(batch_size >= 2);

if files.len() <= batch_size {
let merger = merge_without_limit(files, settings);
merger?.write_all(settings, output)
} else {
let mut temporary_files = vec![];
let mut batch = vec![];
let mut batch = Vec::with_capacity(batch_size);
for file in files {
batch.push(file);
if batch.len() >= settings.merge_batch_size {
assert_eq!(batch.len(), settings.merge_batch_size);
if batch.len() >= batch_size {
assert_eq!(batch.len(), batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;
batch = vec![];
batch = Vec::with_capacity(batch_size);

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
Expand All @@ -115,7 +140,7 @@ pub fn merge_with_file_limit<
}
// Merge any remaining files that didn't get merged in a full batch above.
if !batch.is_empty() {
assert!(batch.len() < settings.merge_batch_size);
assert!(batch.len() < batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;

let mut tmp_file =
Expand Down
48 changes: 38 additions & 10 deletions src/uu/sort/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,13 +1073,27 @@ fn make_sort_mode_arg(mode: &'static str, short: char, help: String) -> Arg {

#[cfg(target_os = "linux")]
fn get_rlimit() -> UResult<usize> {
use nix::sys::resource::{Resource, getrlimit};
use nix::sys::resource::{RLIM_INFINITY, Resource, getrlimit};

getrlimit(Resource::RLIMIT_NOFILE)
.map(|(rlim_cur, _)| rlim_cur as usize)
let (rlim_cur, _rlim_max) = getrlimit(Resource::RLIMIT_NOFILE)
.map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))?;
if rlim_cur == RLIM_INFINITY {
return Err(UUsageError::new(2, translate!("sort-failed-fetch-rlimit")));
}
usize::try_from(rlim_cur)
.map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))
}

#[cfg(target_os = "linux")]
pub(crate) fn fd_soft_limit() -> Option<usize> {
get_rlimit().ok()
}

#[cfg(not(target_os = "linux"))]
pub(crate) fn fd_soft_limit() -> Option<usize> {
None
}

const STDIN_FILE: &str = "-";

/// Legacy `+POS1 [-POS2]` syntax is permitted unless `_POSIX2_VERSION` is in
Expand Down Expand Up @@ -1232,12 +1246,12 @@ fn default_merge_batch_size() -> usize {
#[cfg(target_os = "linux")]
{
// Adjust merge batch size dynamically based on available file descriptors.
match get_rlimit() {
Ok(limit) => {
match fd_soft_limit() {
Some(limit) => {
let usable_limit = limit.saturating_div(LINUX_BATCH_DIVISOR);
usable_limit.clamp(LINUX_BATCH_MIN, LINUX_BATCH_MAX)
}
Err(_) => 64,
None => 64,
}
}

Expand Down Expand Up @@ -1366,9 +1380,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
settings.threads = matches
.get_one::<String>(options::PARALLEL)
.map_or_else(|| "0".to_string(), String::from);
unsafe {
env::set_var("RAYON_NUM_THREADS", &settings.threads);
}
let num_threads = match settings.threads.parse::<usize>() {
Ok(0) | Err(_) => std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1),
Ok(n) => n,
};
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build_global();
}

if let Some(size_str) = matches.get_one::<String>(options::BUF_SIZE) {
Expand Down Expand Up @@ -1419,7 +1439,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {

translate!(
"sort-maximum-batch-size-rlimit",
"rlimit" => get_rlimit()?
"rlimit" => {
let Some(rlimit) = fd_soft_limit() else {
return Err(UUsageError::new(
2,
translate!("sort-failed-fetch-rlimit"),
));
};
rlimit
}
)
}
#[cfg(not(target_os = "linux"))]
Expand Down
Loading