diff --git a/src/uu/sort/BENCHMARKING.md b/src/uu/sort/BENCHMARKING.md index 52866719d..fd728c41d 100644 --- a/src/uu/sort/BENCHMARKING.md +++ b/src/uu/sort/BENCHMARKING.md @@ -72,7 +72,8 @@ Run `cargo build --release` before benchmarking after you make a change! ## External sorting Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting -huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` +huge files (ideally multiple Gigabytes) with `-S` (or without `-S` to benchmark with our default value). +Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` multiple times (this will add the contents of `shuffled_wordlist.txt` to itself). Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'` diff --git a/src/uu/sort/src/check.rs b/src/uu/sort/src/check.rs index fe815b624..01b5a25b5 100644 --- a/src/uu/sort/src/check.rs +++ b/src/uu/sort/src/check.rs @@ -87,6 +87,7 @@ fn reader( chunks::read( &mut sender, recycled_buffer, + None, &mut carry_over, &mut file, &mut iter::empty(), diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index c679980ec..7a7749003 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -52,13 +52,20 @@ impl Chunk { /// Read a chunk, parse lines and send them. /// -/// No empty chunk will be sent. +/// No empty chunk will be sent. If we reach the end of the input, sender_option +/// is set to None. If this function however does not set sender_option to None, +/// it is not guaranteed that there is still input left: If the input fits _exactly_ +/// into a buffer, we will only notice that there's nothing more to read at the next +/// invocation. /// /// # Arguments /// -/// * `sender_option`: The sender to send the lines to the sorter. If `None`, does nothing. +/// (see also `read_to_chunk` for a more detailed documentation) +/// +/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing. /// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) +/// * `max_buffer_size`: How big `buffer` can be. /// * `carry_over`: The bytes that must be carried over in between invocations. /// * `file`: The current file. /// * `next_files`: What `file` should be updated to next. @@ -69,6 +76,7 @@ impl Chunk { pub fn read( sender_option: &mut Option>, mut buffer: Vec, + max_buffer_size: Option, carry_over: &mut Vec, file: &mut Box, next_files: &mut impl Iterator>, @@ -82,8 +90,14 @@ pub fn read( buffer.resize(carry_over.len() + 10 * 1024, 0); } buffer[..carry_over.len()].copy_from_slice(&carry_over); - let (read, should_continue) = - read_to_buffer(file, next_files, &mut buffer, carry_over.len(), separator); + let (read, should_continue) = read_to_buffer( + file, + next_files, + &mut buffer, + max_buffer_size, + carry_over.len(), + separator, + ); carry_over.clear(); carry_over.extend_from_slice(&buffer[read..]); @@ -138,7 +152,8 @@ fn parse_lines<'a>( /// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`, /// and this function continues reading. /// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset` -/// as well). It will not be grown by default, unless that is necessary to read at least two lines. +/// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines. +/// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines. /// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over /// from the previous read and should not be overwritten. /// * `separator`: The byte that separates lines. @@ -153,6 +168,7 @@ fn read_to_buffer( file: &mut Box, next_files: &mut impl Iterator>, buffer: &mut Vec, + max_buffer_size: Option, start_offset: usize, separator: u8, ) -> (usize, bool) { @@ -162,6 +178,19 @@ fn read_to_buffer( Ok(0) => { if read_target.is_empty() { // chunk is full + if let Some(max_buffer_size) = max_buffer_size { + if max_buffer_size > buffer.len() { + // we can grow the buffer + let prev_len = buffer.len(); + if buffer.len() < max_buffer_size / 2 { + buffer.resize(buffer.len() * 2, 0); + } else { + buffer.resize(max_buffer_size, 0); + } + read_target = &mut buffer[prev_len..]; + continue; + } + } let mut sep_iter = memchr_iter(separator, &buffer).rev(); let last_line_end = sep_iter.next(); if sep_iter.next().is_some() { diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 629ebb714..a304bf7c0 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -5,12 +5,13 @@ // * For the full copyright and license information, please view the LICENSE // * file that was distributed with this source code. -//! Sort big files by using files for storing intermediate chunks. +//! Sort big files by using auxiliary files for storing intermediate chunks. //! //! Files are read into chunks of memory which are then sorted individually and //! written to temporary files. There are two threads: One sorter, and one reader/writer. //! The buffers for the individual chunks are recycled. There are two buffers. +use std::cmp::Ordering; use std::io::{BufWriter, Write}; use std::path::Path; use std::{ @@ -20,30 +21,19 @@ use std::{ thread, }; +use itertools::Itertools; + use tempdir::TempDir; use crate::{ chunks::{self, Chunk}, - merge::{self, FileMerger}, - sort_by, GlobalSettings, + compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, }; -/// Iterator that wraps the -pub struct ExtSortedMerger<'a> { - pub file_merger: FileMerger<'a>, - // Keep _tmp_dir around, as it is deleted when dropped. - _tmp_dir: TempDir, -} +const MIN_BUFFER_SIZE: usize = 8_000; -/// Sort big files by using files for storing intermediate chunks. -/// -/// # Returns -/// -/// An iterator that merges intermediate files back together. -pub fn ext_sort<'a>( - files: &mut impl Iterator>, - settings: &'a GlobalSettings, -) -> ExtSortedMerger<'a> { +/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. +pub fn ext_sort(files: &mut impl Iterator>, settings: &GlobalSettings) { let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); @@ -51,7 +41,7 @@ pub fn ext_sort<'a>( let settings = settings.clone(); move || sorter(recycled_receiver, sorted_sender, settings) }); - let chunks_read = reader_writer( + let read_result = reader_writer( files, &tmp_dir, if settings.zero_terminated { @@ -66,13 +56,29 @@ pub fn ext_sort<'a>( sorted_receiver, recycled_sender, ); - let files = (0..chunks_read) - .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) - .collect::>(); - - ExtSortedMerger { - file_merger: merge::merge(&files, settings), - _tmp_dir: tmp_dir, + match read_result { + ReadResult::WroteChunksToFile { chunks_written } => { + let files = (0..chunks_written) + .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) + .collect::>(); + let mut merger = merge::merge(&files, settings); + merger.write_all(settings); + } + ReadResult::SortedSingleChunk(chunk) => { + output_sorted_lines(chunk.borrow_lines().iter(), settings); + } + ReadResult::SortedTwoChunks([a, b]) => { + let merged_iter = a + .borrow_lines() + .iter() + .merge_by(b.borrow_lines().iter(), |line_a, line_b| { + compare_by(line_a, line_b, settings) != Ordering::Greater + }); + output_sorted_lines(merged_iter, settings); + } + ReadResult::EmptyInput => { + // don't output anything + } } } @@ -84,6 +90,21 @@ fn sorter(receiver: Receiver, sender: SyncSender, settings: Global } } +/// Describes how we read the chunks from the input. +enum ReadResult { + /// The input was empty. Nothing was read. + EmptyInput, + /// The input fits into a single Chunk, which was kept in memory. + SortedSingleChunk(Chunk), + /// The input fits into two chunks, which were kept in memory. + SortedTwoChunks([Chunk; 2]), + /// The input was read into multiple chunks, which were written to auxiliary files. + WroteChunksToFile { + /// The number of chunks written to auxiliary files. + chunks_written: usize, + }, +} + /// The function that is executed on the reader/writer thread. /// /// # Returns @@ -96,7 +117,7 @@ fn reader_writer( settings: GlobalSettings, receiver: Receiver, sender: SyncSender, -) -> usize { +) -> ReadResult { let mut sender_option = Some(sender); let mut file = files.next().unwrap(); @@ -106,21 +127,40 @@ fn reader_writer( for _ in 0..2 { chunks::read( &mut sender_option, - vec![0; buffer_size], + vec![0; MIN_BUFFER_SIZE], + Some(buffer_size), &mut carry_over, &mut file, &mut files, separator, Vec::new(), &settings, - ) + ); + if sender_option.is_none() { + // We have already read the whole input. Since we are in our first two reads, + // this means that we can fit the whole input into memory. Bypass writing below and + // handle this case in a more straightforward way. + return if let Ok(first_chunk) = receiver.recv() { + if let Ok(second_chunk) = receiver.recv() { + ReadResult::SortedTwoChunks([first_chunk, second_chunk]) + } else { + ReadResult::SortedSingleChunk(first_chunk) + } + } else { + ReadResult::EmptyInput + }; + } } let mut file_number = 0; loop { let mut chunk = match receiver.recv() { Ok(it) => it, - _ => return file_number, + _ => { + return ReadResult::WroteChunksToFile { + chunks_written: file_number, + } + } }; write( @@ -129,13 +169,14 @@ fn reader_writer( separator, ); - let (recycled_lines, recycled_buffer) = chunk.recycle(); - file_number += 1; + let (recycled_lines, recycled_buffer) = chunk.recycle(); + chunks::read( &mut sender_option, recycled_buffer, + None, &mut carry_over, &mut file, &mut files, diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 6f7cdfed7..48d48ad40 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -108,6 +108,7 @@ fn reader( chunks::read( sender, recycled_buffer, + None, carry_over, file, &mut iter::empty(), diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index b6ab5a2b1..bc3b65492 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -93,7 +93,10 @@ static THOUSANDS_SEP: char = ','; static NEGATIVE: char = '-'; static POSITIVE: char = '+'; -static DEFAULT_BUF_SIZE: usize = std::usize::MAX; +// Choosing a higher buffer size does not result in performance improvements +// (at least not on my machine). TODO: In the future, we should also take the amount of +// available memory into consideration, instead of relying on this constant only. +static DEFAULT_BUF_SIZE: usize = 1_000_000_000; // 1 GB #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)] enum SortMode { @@ -127,28 +130,35 @@ pub struct GlobalSettings { zero_terminated: bool, buffer_size: usize, tmp_dir: PathBuf, - ext_sort: bool, } impl GlobalSettings { - // It's back to do conversions for command line opts! - // Probably want to do through numstrcmp somehow now? - fn human_numeric_convert(a: &str) -> usize { - let num_str = &a[get_leading_gen(a)]; - let (_, suf_str) = a.split_at(num_str.len()); - let num_usize = num_str - .parse::() - .expect("Error parsing buffer size: "); - let suf_usize: usize = match suf_str.to_uppercase().as_str() { - // SI Units - "B" => 1usize, - "K" => 1000usize, - "M" => 1000000usize, - "G" => 1000000000usize, - // GNU regards empty human numeric values as K by default - _ => 1000usize, - }; - num_usize * suf_usize + /// Interpret this `&str` as a number with an optional trailing si unit. + /// + /// If there is no trailing si unit, the implicit unit is K. + /// The suffix B causes the number to be interpreted as a byte count. + fn parse_byte_count(input: &str) -> usize { + const SI_UNITS: &[char] = &['B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']; + + let input = input.trim(); + + let (num_str, si_unit) = + if input.ends_with(|c: char| SI_UNITS.contains(&c.to_ascii_uppercase())) { + let mut chars = input.chars(); + let si_suffix = chars.next_back().unwrap().to_ascii_uppercase(); + let si_unit = SI_UNITS.iter().position(|&c| c == si_suffix).unwrap(); + let num_str = chars.as_str(); + (num_str, si_unit) + } else { + (input, 1) + }; + + let num_usize: usize = num_str + .trim() + .parse() + .unwrap_or_else(|e| crash!(1, "failed to parse buffer size `{}`: {}", num_str, e)); + + num_usize.saturating_mul(1000usize.saturating_pow(si_unit as u32)) } fn out_writer(&self) -> BufWriter> { @@ -189,7 +199,6 @@ impl Default for GlobalSettings { zero_terminated: false, buffer_size: DEFAULT_BUF_SIZE, tmp_dir: PathBuf::new(), - ext_sort: false, } } } @@ -941,28 +950,15 @@ pub fn uumain(args: impl uucore::Args) -> i32 { env::set_var("RAYON_NUM_THREADS", &settings.threads); } - if matches.is_present(OPT_BUF_SIZE) { - settings.buffer_size = { - let input = matches - .value_of(OPT_BUF_SIZE) - .map(String::from) - .unwrap_or(format!("{}", DEFAULT_BUF_SIZE)); + settings.buffer_size = matches + .value_of(OPT_BUF_SIZE) + .map(GlobalSettings::parse_byte_count) + .unwrap_or(DEFAULT_BUF_SIZE); - GlobalSettings::human_numeric_convert(&input) - }; - settings.ext_sort = true; - } - - if matches.is_present(OPT_TMP_DIR) { - let result = matches - .value_of(OPT_TMP_DIR) - .map(String::from) - .unwrap_or(format!("{}", env::temp_dir().display())); - settings.tmp_dir = PathBuf::from(result); - settings.ext_sort = true; - } else { - settings.tmp_dir = env::temp_dir(); - } + settings.tmp_dir = matches + .value_of(OPT_TMP_DIR) + .map(PathBuf::from) + .unwrap_or_else(env::temp_dir); settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.merge = matches.is_present(OPT_MERGE); @@ -1047,7 +1043,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { exec(&files, &settings) } -fn output_sorted_lines<'a>(iter: impl Iterator>, settings: &GlobalSettings) { +fn output_sorted_lines<'a>(iter: impl Iterator>, settings: &GlobalSettings) { if settings.unique { print_sorted( iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal), @@ -1067,34 +1063,10 @@ fn exec(files: &[String], settings: &GlobalSettings) -> i32 { crash!(1, "only one file allowed with -c"); } return check::check(files.first().unwrap(), settings); - } else if settings.ext_sort { + } else { let mut lines = files.iter().filter_map(open); - let mut sorted = ext_sort(&mut lines, &settings); - sorted.file_merger.write_all(settings); - } else { - let separator = if settings.zero_terminated { '\0' } else { '\n' }; - let mut lines = vec![]; - let mut full_string = String::new(); - - for mut file in files.iter().filter_map(open) { - crash_if_err!(1, file.read_to_string(&mut full_string)); - - if !full_string.ends_with(separator) { - full_string.push(separator); - } - } - - if full_string.ends_with(separator) { - full_string.pop(); - } - - for line in full_string.split(if settings.zero_terminated { '\0' } else { '\n' }) { - lines.push(Line::create(line, &settings)); - } - - sort_by(&mut lines, &settings); - output_sorted_lines(lines.into_iter(), &settings); + ext_sort(&mut lines, &settings); } 0 } @@ -1366,7 +1338,7 @@ fn version_compare(a: &str, b: &str) -> Ordering { } } -fn print_sorted<'a, T: Iterator>>(iter: T, settings: &GlobalSettings) { +fn print_sorted<'a, T: Iterator>>(iter: T, settings: &GlobalSettings) { let mut writer = settings.out_writer(); for line in iter { line.print(&mut writer, settings); diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index e89d18054..23705d2ee 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -15,40 +15,35 @@ fn test_helper(file_name: &str, args: &str) { .stdout_is_fixture(format!("{}.expected.debug", file_name)); } -// FYI, the initialization size of our Line struct is 96 bytes. -// -// At very small buffer sizes, with that overhead we are certainly going -// to overrun our buffer way, way, way too quickly because of these excess -// bytes for the struct. -// -// For instance, seq 0..20000 > ...text = 108894 bytes -// But overhead is 1920000 + 108894 = 2028894 bytes -// -// Or kjvbible-random.txt = 4332506 bytes, but minimum size of its -// 99817 lines in memory * 96 bytes = 9582432 bytes -// -// Here, we test 108894 bytes with a 50K buffer -// #[test] -fn test_larger_than_specified_segment() { - new_ucmd!() - .arg("-n") - .arg("-S") - .arg("50K") - .arg("ext_sort.txt") - .succeeds() - .stdout_is_fixture("ext_sort.expected"); +fn test_buffer_sizes() { + let buffer_sizes = [ + "0", "50K", "50k", "1M", "100M", "1000G", "10T", "500E", "1Y", + ]; + for buffer_size in &buffer_sizes { + new_ucmd!() + .arg("-n") + .arg("-S") + .arg(buffer_size) + .arg("ext_sort.txt") + .succeeds() + .stdout_is_fixture("ext_sort.expected"); + } } #[test] -fn test_smaller_than_specified_segment() { - new_ucmd!() - .arg("-n") - .arg("-S") - .arg("100M") - .arg("ext_sort.txt") - .succeeds() - .stdout_is_fixture("ext_sort.expected"); +fn test_invalid_buffer_size() { + let buffer_sizes = ["asd", "100f"]; + for invalid_buffer_size in &buffer_sizes { + new_ucmd!() + .arg("-S") + .arg(invalid_buffer_size) + .fails() + .stderr_only(format!( + "sort: error: failed to parse buffer size `{}`: invalid digit found in string", + invalid_buffer_size + )); + } } #[test]