Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,10 @@ rstest = { workspace = true }

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dev-dependencies]
procfs = { version = "0.16", default-features = false }
rlimit = "0.10.1"

[target.'cfg(unix)'.dev-dependencies]
nix = { workspace = true, features = ["process", "signal", "user", "term"] }
rlimit = "0.10.1"
rand_pcg = "0.3"
xattr = { workspace = true }

Expand Down
80 changes: 52 additions & 28 deletions src/uu/split/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,11 @@ struct OutFile {
/// and [`n_chunks_by_line_round_robin`] functions.
type OutFiles = Vec<OutFile>;
trait ManageOutFiles {
fn instantiate_writer(
&mut self,
idx: usize,
settings: &Settings,
) -> UResult<&mut BufWriter<Box<dyn Write>>>;
/// Initialize a new set of output files
/// Each OutFile is generated with filename, while the writer for it could be
/// optional, to be instantiated later by the calling function as needed.
Expand Down Expand Up @@ -1194,44 +1199,63 @@ impl ManageOutFiles for OutFiles {
Ok(out_files)
}

fn get_writer(
fn instantiate_writer(
&mut self,
idx: usize,
settings: &Settings,
) -> UResult<&mut BufWriter<Box<dyn Write>>> {
if self[idx].maybe_writer.is_some() {
Ok(self[idx].maybe_writer.as_mut().unwrap())
} else {
// Writer was not instantiated upfront or was temporarily closed due to system resources constraints.
// Instantiate it and record for future use.
let mut count = 0;
// Use-case for doing multiple tries of closing fds:
// E.g. split running in parallel to other processes (e.g. another split) doing similar stuff,
// sharing the same limits. In this scenario, after closing one fd, the other process
// might "steel" the freed fd and open a file on its side. Then it would be beneficial
// if split would be able to close another fd before cancellation.
'loop1: loop {
let filename_to_open = self[idx].filename.as_str();
let file_to_open_is_new = self[idx].is_new;
let maybe_writer =
settings.instantiate_current_writer(self[idx].filename.as_str(), self[idx].is_new);
settings.instantiate_current_writer(filename_to_open, file_to_open_is_new);
if let Ok(writer) = maybe_writer {
self[idx].maybe_writer = Some(writer);
Ok(self[idx].maybe_writer.as_mut().unwrap())
} else if settings.filter.is_some() {
return Ok(self[idx].maybe_writer.as_mut().unwrap());
}

if settings.filter.is_some() {
// Propagate error if in `--filter` mode
Err(maybe_writer.err().unwrap().into())
} else {
// Could have hit system limit for open files.
// Try to close one previously instantiated writer first
for (i, out_file) in self.iter_mut().enumerate() {
if i != idx && out_file.maybe_writer.is_some() {
out_file.maybe_writer.as_mut().unwrap().flush()?;
out_file.maybe_writer = None;
out_file.is_new = false;
break;
}
return Err(maybe_writer.err().unwrap().into());
}

// Could have hit system limit for open files.
// Try to close one previously instantiated writer first
for (i, out_file) in self.iter_mut().enumerate() {
if i != idx && out_file.maybe_writer.is_some() {
out_file.maybe_writer.as_mut().unwrap().flush()?;
out_file.maybe_writer = None;
out_file.is_new = false;
count += 1;

// And then try to instantiate the writer again
continue 'loop1;
}
// And then try to instantiate the writer again
// If this fails - give up and propagate the error
self[idx].maybe_writer =
Some(settings.instantiate_current_writer(
self[idx].filename.as_str(),
self[idx].is_new,
)?);
Ok(self[idx].maybe_writer.as_mut().unwrap())
}

// If this fails - give up and propagate the error
uucore::show_error!("at file descriptor limit, but no file descriptor left to close. Closed {count} writers before.");
return Err(maybe_writer.err().unwrap().into());
}
}

fn get_writer(
&mut self,
idx: usize,
settings: &Settings,
) -> UResult<&mut BufWriter<Box<dyn Write>>> {
if self[idx].maybe_writer.is_some() {
Ok(self[idx].maybe_writer.as_mut().unwrap())
} else {
// Writer was not instantiated upfront or was temporarily closed due to system resources constraints.
// Instantiate it and record for future use.
self.instantiate_writer(idx, settings)
}
}
}
Expand Down
72 changes: 56 additions & 16 deletions tests/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
// For the full copyright and license information, please view the LICENSE
// file that was distributed with this source code.

//spell-checker: ignore (linux) rlimit prlimit coreutil ggroups uchild uncaptured scmd SHLVL canonicalized openpty winsize xpixel ypixel
//spell-checker: ignore (linux) rlimit prlimit coreutil ggroups uchild uncaptured scmd SHLVL canonicalized openpty
//spell-checker: ignore (linux) winsize xpixel ypixel setrlimit FSIZE

#![allow(dead_code)]

#[cfg(unix)]
use nix::pty::OpenptyResult;
use pretty_assertions::assert_eq;
#[cfg(any(target_os = "linux", target_os = "android"))]
use rlimit::prlimit;
#[cfg(unix)]
use rlimit::setrlimit;
#[cfg(feature = "sleep")]
use rstest::rstest;
#[cfg(unix)]
Expand All @@ -27,6 +28,8 @@ use std::os::fd::OwnedFd;
#[cfg(unix)]
use std::os::unix::fs::{symlink as symlink_dir, symlink as symlink_file, PermissionsExt};
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::fs::{symlink_dir, symlink_file};
Expand Down Expand Up @@ -1224,7 +1227,7 @@ pub struct UCommand {
stdout: Option<Stdio>,
stderr: Option<Stdio>,
bytes_into_stdin: Option<Vec<u8>>,
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg(unix)]
limits: Vec<(rlimit::Resource, u64, u64)>,
stderr_to_stdout: bool,
timeout: Option<Duration>,
Expand Down Expand Up @@ -1387,7 +1390,7 @@ impl UCommand {
self
}

#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg(unix)]
pub fn limit(
&mut self,
resource: rlimit::Resource,
Expand Down Expand Up @@ -1646,6 +1649,25 @@ impl UCommand {
command.stdin(pi_slave).stdout(po_slave).stderr(pe_slave);
}

#[cfg(unix)]
if !self.limits.is_empty() {
// just to be safe: move a copy of the limits list into the closure.
// this way the closure is fully self-contained.
let limits_copy = self.limits.clone();
let closure = move || -> Result<()> {
for &(resource, soft_limit, hard_limit) in &limits_copy {
setrlimit(resource, soft_limit, hard_limit)?;
}
Ok(())
};
// SAFETY: the closure is self-contained and doesn't do any memory
// writes that would need to be propagated back to the parent process.
// also, the closure doesn't access stdin, stdout and stderr.
unsafe {
command.pre_exec(closure);
}
}

(command, captured_stdout, captured_stderr, stdin_pty)
}

Expand All @@ -1660,17 +1682,6 @@ impl UCommand {

let child = command.spawn().unwrap();

#[cfg(any(target_os = "linux", target_os = "android"))]
for &(resource, soft_limit, hard_limit) in &self.limits {
prlimit(
child.id() as i32,
resource,
Some((soft_limit, hard_limit)),
None,
)
.unwrap();
}

let mut child = UChild::from(self, child, captured_stdout, captured_stderr, stdin_pty);

if let Some(input) = self.bytes_into_stdin.take() {
Expand Down Expand Up @@ -3706,4 +3717,33 @@ mod tests {
);
std::assert_eq!(String::from_utf8_lossy(out.stderr()), "");
}

#[cfg(unix)]
#[test]
fn test_application_of_process_resource_limits_unlimited_file_size() {
let ts = TestScenario::new("util");
ts.cmd("sh")
.args(&["-c", "ulimit -Sf; ulimit -Hf"])
.succeeds()
.no_stderr()
.stdout_is("unlimited\nunlimited\n");
}

#[cfg(unix)]
#[test]
fn test_application_of_process_resource_limits_limited_file_size() {
let unit_size_bytes = if cfg!(target_os = "macos") { 1024 } else { 512 };

let ts = TestScenario::new("util");
ts.cmd("sh")
.args(&["-c", "ulimit -Sf; ulimit -Hf"])
.limit(
rlimit::Resource::FSIZE,
8 * unit_size_bytes,
16 * unit_size_bytes,
)
.succeeds()
.no_stderr()
.stdout_is("8\n16\n");
}
}