diff --git a/src/uu/sort/src/check.rs b/src/uu/sort/src/check.rs index dbf598574fe..a41a3e35d6d 100644 --- a/src/uu/sort/src/check.rs +++ b/src/uu/sort/src/check.rs @@ -107,7 +107,7 @@ fn reader( ) -> UResult<()> { let mut carry_over = vec![]; for recycled_chunk in receiver { - let should_continue = chunks::read( + let progress = chunks::read( sender, recycled_chunk, None, @@ -117,7 +117,7 @@ fn reader( settings.line_ending.into(), settings, )?; - if !should_continue { + if matches!(progress, chunks::ReadProgress::Finished) { break; } } diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index af10a008844..d80cc87c137 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -5,6 +5,8 @@ //! Utilities for reading files as chunks. +// spell-checker:ignore memrchr + #![allow(dead_code)] // Ignores non-used warning for `borrow_buffer` in `Chunk` @@ -119,27 +121,43 @@ impl RecycledChunk { } } -/// Read a chunk, parse lines and send them. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReadProgress { + /// At least one full line was read and sent as a chunk to the sorter. + SentChunk, + /// Buffer cap reached without a separator; caller should spill the current record. + NeedSpill, + /// No more input remains; nothing was sent. + Finished, + /// No complete line available yet, but more input may remain. + NoChunk, +} + +/// Read a chunk, parse complete records, and send them to the sorter. /// -/// No empty chunk will be sent. If we reach the end of the input, `false` is returned. -/// However, if this function returns `true`, it is not guaranteed that there is still -/// input left: If the input fits _exactly_ into a buffer, we will only notice that there's -/// nothing more to read at the next invocation. In case there is no input left, nothing will -/// be sent. +/// This function attempts to read at least one complete record (delimited by `separator`). +/// Data after the last complete record is left in `carry_over` to be prefixed on the +/// next invocation. Memory growth is bounded by `max_buffer_size` when provided. /// -/// # Arguments +/// # Returns +/// - `SentChunk`: At least one full record was read; a `Chunk` was sent to `sender`. +/// - `NoChunk`: No full record yet, but more input may remain; call again. +/// - `NeedSpill`: The buffer hit the cap with no separator found; caller should spill the +/// current oversized record to a run file and then continue. +/// - `Finished`: No more input remains; nothing was sent. /// -/// (see also `read_to_chunk` for a more detailed documentation) +/// # Arguments /// -/// * `sender`: The sender to send the lines to the sorter. -/// * `recycled_chunk`: The recycled chunk, as returned by `Chunk::recycle`. -/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) -/// * `max_buffer_size`: How big `buffer` can be. -/// * `carry_over`: The bytes that must be carried over in between invocations. -/// * `file`: The current file. -/// * `next_files`: What `file` should be updated to next. -/// * `separator`: The line separator. -/// * `settings`: The global settings. +/// * `sender`: Channel to send the populated `Chunk` to the sorter thread. +/// * `recycled_chunk`: Result of `Chunk::recycle`, providing reusable vectors and buffer. +/// `buffer.len()` equals its current capacity and will be reused for reading. +/// * `max_buffer_size`: Maximum buffer size in bytes; if `Some`, reading respects this cap. +/// * `carry_over`: Bytes from the previous call after the last separator; they are copied +/// to the beginning of the buffer before reading. +/// * `file`: Current reader. +/// * `next_files`: Iterator to advance to the next file once `file` reaches EOF. +/// * `separator`: Record delimiter (e.g., `b'\n'` or `b'\0'`). +/// * `settings`: Global sort settings (used for tokenization decisions when building `Chunk`). #[allow(clippy::too_many_arguments)] pub fn read( sender: &SyncSender, @@ -147,10 +165,10 @@ pub fn read( max_buffer_size: Option, carry_over: &mut Vec, file: &mut T, - next_files: &mut impl Iterator>, + next_files: &mut dyn Iterator>, separator: u8, settings: &GlobalSettings, -) -> UResult { +) -> UResult { let RecycledChunk { lines, selections, @@ -163,7 +181,7 @@ pub fn read( buffer.resize(carry_over.len() + 10 * 1024, 0); } buffer[..carry_over.len()].copy_from_slice(carry_over); - let (read, should_continue) = read_to_buffer( + let (read, should_continue, need_spill) = read_to_buffer( file, next_files, &mut buffer, @@ -174,6 +192,10 @@ pub fn read( carry_over.clear(); carry_over.extend_from_slice(&buffer[read..]); + if need_spill { + return Ok(ReadProgress::NeedSpill); + } + if read != 0 { let payload: UResult = Chunk::try_new(buffer, |buffer| { let selections = unsafe { @@ -197,8 +219,16 @@ pub fn read( Ok(ChunkContents { lines, line_data }) }); sender.send(payload?).unwrap(); + return Ok(ReadProgress::SentChunk); } - Ok(should_continue) + Ok(if should_continue { + // No full line could be sent now, but there might still be input. + // This case happens when the input exactly fits the buffer without a separator at the end. + // The next call will continue reading and eventually emit a chunk or finish. + ReadProgress::NoChunk + } else { + ReadProgress::Finished + }) } /// Split `read` into `Line`s, and add them to `lines`. @@ -224,88 +254,87 @@ fn parse_lines<'a>( ); } -/// Read from `file` into `buffer`. -/// -/// This function makes sure that at least two lines are read (unless we reach EOF and there's no next file), -/// growing the buffer if necessary. -/// The last line is likely to not have been fully read into the buffer. Its bytes must be copied to -/// the front of the buffer for the next invocation so that it can be continued to be read -/// (see the return values and `start_offset`). -/// -/// # Arguments +/// Read from `file` into `buffer` until at least one complete record is present or EOF. /// -/// * `file`: The file to start reading from. -/// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`, -/// and this function continues reading. -/// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset` -/// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines. -/// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines. -/// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over -/// from the previous read and should not be overwritten. -/// * `separator`: The byte that separates lines. +/// This function makes sure that at least one complete record (terminated by `separator`) is +/// available in `buffer` (unless we reach EOF and there is no next file). The buffer is grown +/// if necessary, respecting `max_buffer_size` when provided. The bytes after the last complete +/// record remain in `buffer` and should be carried over to the next invocation. /// -/// # Returns +/// Arguments: +/// - `file`: The file to read from initially. +/// - `next_files`: Iterator used to advance to the next file when `file` reaches EOF; reading continues seamlessly. +/// - `buffer`: The destination buffer. Contents from `start_offset` onward will be overwritten by new data. +/// - `max_buffer_size`: Optional cap for `buffer` growth in bytes. +/// - `start_offset`: Number of bytes at the start of `buffer` containing carry-over data that must be preserved. +/// - `separator`: Record delimiter byte. /// -/// * The amount of bytes in `buffer` that can now be interpreted as lines. -/// The remaining bytes must be copied to the start of the buffer for the next invocation, -/// if another invocation is necessary, which is determined by the other return value. -/// * Whether this function should be called again. +/// Returns `(read_len, should_continue, need_spill)`: +/// - `read_len`: The number of bytes in `buffer` that form complete records ready for parsing. +/// - `should_continue`: `true` if more input may remain and another call could read additional data. +/// - `need_spill`: `true` if the buffer reached `max_buffer_size` without encountering a separator, +/// indicating the caller should spill the current oversized record to disk. fn read_to_buffer( file: &mut T, - next_files: &mut impl Iterator>, + next_files: &mut dyn Iterator>, buffer: &mut Vec, max_buffer_size: Option, start_offset: usize, separator: u8, -) -> UResult<(usize, bool)> { +) -> UResult<(usize, bool, bool)> { let mut read_target = &mut buffer[start_offset..]; let mut last_file_empty = true; + // Only search for newlines in regions we haven't scanned before to avoid quadratic behavior. let mut newline_search_offset = 0; let mut found_newline = false; loop { match file.read(read_target) { Ok(0) => { if read_target.is_empty() { - // chunk is full - if let Some(max_buffer_size) = max_buffer_size { - if max_buffer_size > buffer.len() { - // we can grow the buffer + // Buffer full + if let Some(max) = max_buffer_size { + if max > buffer.len() { + // We can grow the buffer let prev_len = buffer.len(); - if buffer.len() < max_buffer_size / 2 { + if buffer.len() < max / 2 { buffer.resize(buffer.len() * 2, 0); } else { - buffer.resize(max_buffer_size, 0); + buffer.resize(max, 0); } read_target = &mut buffer[prev_len..]; continue; } } + // Buffer cannot grow further or exactly filled: find the last newline seen so far let mut sep_iter = memchr_iter(separator, &buffer[newline_search_offset..buffer.len()]).rev(); newline_search_offset = buffer.len(); if let Some(last_line_end) = sep_iter.next() { if found_newline || sep_iter.next().is_some() { - // We read enough lines. - // We want to include the separator here, because it shouldn't be carried over. - return Ok((last_line_end + 1, true)); + // We read enough lines. Include the separator so it isn't carried over. + return Ok((last_line_end + 1, true, false)); } found_newline = true; } - // We need to read more lines + // Need more data for a full line + if let Some(max) = max_buffer_size { + if buffer.len() >= max { + // Hard cap hit and no newline yet: signal spill + return Ok((0, true, true)); + } + } let len = buffer.len(); - // resize the vector to 10 KB more buffer.resize(len + 1024 * 10, 0); read_target = &mut buffer[len..]; } else { // This file has been fully read. let mut leftover_len = read_target.len(); if !last_file_empty { - // The file was not empty. + // The file was not empty: ensure a trailing separator let read_len = buffer.len() - leftover_len; if buffer[read_len - 1] != separator { - // The file did not end with a separator. We have to insert one. buffer[read_len] = separator; leftover_len -= 1; } @@ -319,7 +348,7 @@ fn read_to_buffer( } else { // This was the last file. let read_len = buffer.len() - leftover_len; - return Ok((read_len, false)); + return Ok((read_len, false, false)); } } } @@ -334,3 +363,36 @@ fn read_to_buffer( } } } + +/// Grow `buffer` by at least a minimal increment, up to an optional cap. +/// +/// If `max_buffer_size` is `Some`, the new length will not exceed it. Once the buffer +/// size reaches the cap, no further growth occurs. +/// Otherwise, the buffer grows approximately by doubling, with a minimum increment of 10 KiB. +/// +/// Ensures monotonic growth: the resulting length is always greater than the current length. +fn grow_buffer(buffer: &mut Vec, max_buffer_size: Option) { + const MIN_GROW: usize = 10 * 1024; + let current_len = buffer.len(); + let mut next_len = if current_len == 0 { + MIN_GROW + } else if let Some(max_buffer_size) = max_buffer_size { + if current_len < max_buffer_size { + std::cmp::min(current_len.saturating_mul(2), max_buffer_size) + .max(current_len.saturating_add(MIN_GROW)) + } else { + // Respect the cap: do not grow further. + current_len + } + } else { + current_len + .saturating_mul(2) + .max(current_len.saturating_add(MIN_GROW)) + }; + + if next_len <= current_len { + next_len = current_len.saturating_add(MIN_GROW.max(1)); + } + + buffer.resize(next_len, 0); +} diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index e43ad4b3a38..8ae72e46e87 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -11,20 +11,21 @@ use std::cmp::Ordering; use std::fs::File; -use std::io::Write; +use std::io::{Read, Write}; use std::path::PathBuf; use std::{ - io::Read, - sync::mpsc::{Receiver, SyncSender}, + sync::mpsc::{Receiver, SyncSender, TryRecvError}, thread, }; use itertools::Itertools; -use uucore::error::UResult; +use memchr::memchr; +use uucore::error::{UResult, USimpleError}; use crate::Output; -use crate::chunks::RecycledChunk; +use crate::chunks::{ReadProgress, RecycledChunk}; use crate::merge::ClosedTmpFile; +use crate::merge::MergeInput; use crate::merge::WriteableCompressedTmpFile; use crate::merge::WriteablePlainTmpFile; use crate::merge::WriteableTmpFile; @@ -46,8 +47,10 @@ pub fn ext_sort( output: Output, tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { - let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); - let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); + // Allow up to two in-flight chunks in each direction to avoid deadlock + // when pre-filling reads while the sorter is ready to send back. + let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(2); + let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(2); thread::spawn({ let settings = settings.clone(); move || sorter(&recycled_receiver, &sorted_sender, &settings) @@ -100,12 +103,23 @@ fn reader_writer< )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { - merge::merge_with_file_limit::<_, _, Tmp>( - tmp_files.into_iter().map(|c| c.reopen()), - settings, - output, - tmp_dir, - )?; + // Optimization: if there is only one temporary run (plain file) and no dedup needed, + // stream it directly to output to avoid re-reading huge records into memory. + if tmp_files.len() == 1 && settings.compress_prog.is_none() && !settings.unique { + let mut reopened = tmp_files.into_iter().next().unwrap().reopen()?; + let mut out = output.into_write(); + std::io::copy(reopened.as_read(), &mut out) + .map_err(|e| USimpleError::new(2, e.to_string()))?; + out.flush() + .map_err(|e| USimpleError::new(2, e.to_string()))?; + } else { + merge::merge_with_file_limit::<_, _, Tmp>( + tmp_files.into_iter().map(|c| c.reopen()), + settings, + output, + tmp_dir, + )?; + } } ReadResult::SortedSingleChunk(chunk) => { if settings.unique { @@ -187,75 +201,233 @@ fn read_write_loop( sender: SyncSender, ) -> UResult> { let mut file = files.next().unwrap()?; - let mut carry_over = vec![]; - // kick things off with two reads - for _ in 0..2 { - let should_continue = chunks::read( - &sender, - RecycledChunk::new(if START_BUFFER_SIZE < buffer_size { - START_BUFFER_SIZE - } else { - buffer_size - }), - Some(buffer_size), - &mut carry_over, - &mut file, - &mut files, - separator, - settings, - )?; - if !should_continue { - drop(sender); - // We have already read the whole input. Since we are in our first two reads, - // this means that we can fit the whole input into memory. Bypass writing below and - // handle this case in a more straightforward way. - return Ok(if let Ok(first_chunk) = receiver.recv() { - if let Ok(second_chunk) = receiver.recv() { - ReadResult::SortedTwoChunks([first_chunk, second_chunk]) + // Maintain up to two in-flight reads to keep the sorter busy + let mut recycled_pool: Vec = + std::iter::repeat_with(|| RecycledChunk::new(START_BUFFER_SIZE.min(buffer_size))) + .take(2) + .collect(); + let mut in_flight = 0usize; + let mut sender_option = Some(sender); + let mut tmp_files: Vec = vec![]; + let mut mem_chunks: Vec = vec![]; + + // Helper to try reading and sending more chunks or spilling long records + let try_read_more = |recycled_pool: &mut Vec, + in_flight: &mut usize, + sender_option: &mut Option>, + tmp_files: &mut Vec, + carry_over: &mut Vec, + file: &mut Box, + files: &mut dyn Iterator>>, + tmp_dir: &mut TmpDirWrapper| + -> UResult<()> { + while sender_option.is_some() && *in_flight < 2 { + let recycled = if let Some(rc) = recycled_pool.pop() { + rc + } else { + RecycledChunk::new(if START_BUFFER_SIZE < buffer_size { + START_BUFFER_SIZE } else { - ReadResult::SortedSingleChunk(first_chunk) + buffer_size + }) + }; + match chunks::read( + sender_option.as_ref().unwrap(), + recycled, + Some(buffer_size), + carry_over, + file, + files, + separator, + settings, + )? { + ReadProgress::SentChunk => { + *in_flight += 1; } - } else { - ReadResult::EmptyInput - }); + ReadProgress::NeedSpill => { + // Spill this oversized record into its own run file + let tmp = spill_long_record::( + tmp_dir, + carry_over, + file.as_mut(), + separator, + settings.compress_prog.as_deref(), + )?; + tmp_files.push(tmp); + // Try to read again (do not change in_flight) + } + ReadProgress::NoChunk => { + // Nothing to send yet; try reading again (continue loop) + } + ReadProgress::Finished => { + *sender_option = None; + break; + } + } } - } + Ok(()) + }; + + // Initial fill + try_read_more( + &mut recycled_pool, + &mut in_flight, + &mut sender_option, + &mut tmp_files, + &mut carry_over, + &mut file, + &mut files, + tmp_dir, + )?; - let mut sender_option = Some(sender); - let mut tmp_files = vec![]; loop { - let Ok(chunk) = receiver.recv() else { - return Ok(ReadResult::WroteChunksToFile { tmp_files }); - }; + if in_flight > 0 { + let Ok(chunk) = receiver.recv() else { + // Sender dropped; finish by merging whatever we have + break; + }; - let tmp_file = write::( - &chunk, - tmp_dir.next_file()?, - settings.compress_prog.as_deref(), - separator, - )?; - tmp_files.push(tmp_file); + in_flight -= 1; + if tmp_files.is_empty() && sender_option.is_none() && mem_chunks.len() < 2 { + // Potential small input: keep in memory for fast path + mem_chunks.push(chunk); + } else { + // General path: write to tmp file + let tmp_file = write::( + &chunk, + tmp_dir.next_file()?, + settings.compress_prog.as_deref(), + separator, + )?; + tmp_files.push(tmp_file); + // Recycle buffer for next reads + recycled_pool.push(chunk.recycle()); + } - let recycled_chunk = chunk.recycle(); + // Attempt to fill again + try_read_more( + &mut recycled_pool, + &mut in_flight, + &mut sender_option, + &mut tmp_files, + &mut carry_over, + &mut file, + &mut files, + tmp_dir, + )?; + } else { + if sender_option.is_none() { + // No more reads possible and no in-flight chunks + if tmp_files.is_empty() { + return Ok(match mem_chunks.len() { + 0 => ReadResult::EmptyInput, + 1 => ReadResult::SortedSingleChunk(mem_chunks.pop().unwrap()), + 2 => ReadResult::SortedTwoChunks([ + mem_chunks.remove(0), + mem_chunks.remove(0), + ]), + _ => unreachable!(), + }); + } + // Flush any in-memory chunks to tmp and finish with merge + for ch in mem_chunks.drain(..) { + let tmp_file = write::( + &ch, + tmp_dir.next_file()?, + settings.compress_prog.as_deref(), + separator, + )?; + tmp_files.push(tmp_file); + } + return Ok(ReadResult::WroteChunksToFile { tmp_files }); + } - if let Some(sender) = &sender_option { - let should_continue = chunks::read( - sender, - recycled_chunk, - None, + // Try reading more if possible + try_read_more( + &mut recycled_pool, + &mut in_flight, + &mut sender_option, + &mut tmp_files, &mut carry_over, &mut file, &mut files, - separator, - settings, + tmp_dir, )?; - if !should_continue { - sender_option = None; + + if in_flight == 0 { + // No chunk to receive yet; loop continues until either we can read or finish + // To avoid busy-looping, try a non-blocking receive in case a chunk just arrived + match receiver.try_recv() { + Ok(chunk) => { + // Process as above + if tmp_files.is_empty() && sender_option.is_none() && mem_chunks.len() < 2 { + mem_chunks.push(chunk); + } else { + let tmp_file = write::( + &chunk, + tmp_dir.next_file()?, + settings.compress_prog.as_deref(), + separator, + )?; + tmp_files.push(tmp_file); + recycled_pool.push(chunk.recycle()); + } + } + Err(TryRecvError::Empty) => { + // nothing to do right now + } + Err(TryRecvError::Disconnected) => break, + } + } + } + } + + Ok(ReadResult::WroteChunksToFile { tmp_files }) +} + +/// Spill a single oversized record into its own temporary run file. +fn spill_long_record( + tmp_dir: &mut TmpDirWrapper, + carry_over: &mut Vec, + file: &mut dyn Read, + separator: u8, + compress_prog: Option<&str>, +) -> UResult { + let mut tmp_file = I::create(tmp_dir.next_file()?, compress_prog)?; + if !carry_over.is_empty() { + tmp_file.as_write().write_all(carry_over).unwrap(); + carry_over.clear(); + } + let mut buf = vec![0u8; 128 * 1024]; + let mut _current_file_had_data = false; + let mut _last_byte: Option = None; + loop { + match file.read(&mut buf) { + Ok(0) => { + // EOF: end current record here + break; } + Ok(n) => { + _current_file_had_data = true; + if let Some(pos) = memchr(separator, &buf[..n]) { + // End of record found within this chunk + tmp_file.as_write().write_all(&buf[..pos]).unwrap(); + // Save remainder after separator for next reads + carry_over.extend_from_slice(&buf[pos + 1..n]); + _last_byte = Some(buf[n - 1]); + break; + } + tmp_file.as_write().write_all(&buf[..n]).unwrap(); + _last_byte = Some(buf[n - 1]); + } + Err(e) => return Err(e.into()), } } + // Append a separator to the run to match write_lines semantics + tmp_file.as_write().write_all(&[separator]).unwrap(); + tmp_file.finished_writing() } /// Write the lines in `chunk` to `file`, separated by `separator`. diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 1e538c6d931..b6771560d28 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -218,6 +218,8 @@ fn reader( settings: &GlobalSettings, separator: u8, ) -> UResult<()> { + // Use a bounded buffer similar to ext_sort + let buffer_size = settings.buffer_size / 10; for (file_idx, recycled_chunk) in recycled_receiver { if let Some(ReaderFile { file, @@ -225,17 +227,30 @@ fn reader( carry_over, }) = &mut files[file_idx] { - let should_continue = chunks::read( + let progress = chunks::read( sender, recycled_chunk, - None, + Some(buffer_size), carry_over, file.as_read(), &mut iter::empty(), separator, settings, )?; - if !should_continue { + if matches!(progress, chunks::ReadProgress::NeedSpill) { + // Fallback: allow a one-off unbounded read for this oversized record + let _ = chunks::read( + sender, + RecycledChunk::new(buffer_size), + None, + carry_over, + file.as_read(), + &mut iter::empty(), + separator, + settings, + )?; + } + if matches!(progress, chunks::ReadProgress::Finished) { // Remove the file from the list by replacing it with `None`. let ReaderFile { file, .. } = files[file_idx].take().unwrap(); // Depending on the kind of the `MergeInput`, this may delete the file: