From e7da8058dc27dc106af66b206318faeec6cf9938 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Fri, 21 May 2021 23:00:13 +0200 Subject: [PATCH 1/2] sort: automatically fall back to extsort To make this work we make default sort a special case of external sort. External sorting uses auxiliary files for intermediate chunks. However, when we can keep our intermediate chunks in memory, we don't write them to the file system at all. Only when we notice that we can't keep them in memory they are written to the disk. Additionally, we don't allocate buffers with the capacity of their maximum size anymore. Instead, they start with a capacity of 8kb and are grown only when needed. This makes sorting smaller files about as fast as it was before (I'm seeing a regression of ~3%), and allows us to seamlessly continue with auxiliary files when needed. --- src/uu/sort/BENCHMARKING.md | 3 +- src/uu/sort/src/check.rs | 1 + src/uu/sort/src/chunks.rs | 39 ++++++++++++-- src/uu/sort/src/ext_sort.rs | 105 +++++++++++++++++++++++++----------- src/uu/sort/src/merge.rs | 1 + src/uu/sort/src/sort.rs | 68 ++++++----------------- tests/by-util/test_sort.rs | 33 ++++-------- 7 files changed, 138 insertions(+), 112 deletions(-) diff --git a/src/uu/sort/BENCHMARKING.md b/src/uu/sort/BENCHMARKING.md index 52866719d..fd728c41d 100644 --- a/src/uu/sort/BENCHMARKING.md +++ b/src/uu/sort/BENCHMARKING.md @@ -72,7 +72,8 @@ Run `cargo build --release` before benchmarking after you make a change! ## External sorting Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting -huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` +huge files (ideally multiple Gigabytes) with `-S` (or without `-S` to benchmark with our default value). +Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` multiple times (this will add the contents of `shuffled_wordlist.txt` to itself). Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'` diff --git a/src/uu/sort/src/check.rs b/src/uu/sort/src/check.rs index fe815b624..01b5a25b5 100644 --- a/src/uu/sort/src/check.rs +++ b/src/uu/sort/src/check.rs @@ -87,6 +87,7 @@ fn reader( chunks::read( &mut sender, recycled_buffer, + None, &mut carry_over, &mut file, &mut iter::empty(), diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index c679980ec..7a7749003 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -52,13 +52,20 @@ impl Chunk { /// Read a chunk, parse lines and send them. /// -/// No empty chunk will be sent. +/// No empty chunk will be sent. If we reach the end of the input, sender_option +/// is set to None. If this function however does not set sender_option to None, +/// it is not guaranteed that there is still input left: If the input fits _exactly_ +/// into a buffer, we will only notice that there's nothing more to read at the next +/// invocation. /// /// # Arguments /// -/// * `sender_option`: The sender to send the lines to the sorter. If `None`, does nothing. +/// (see also `read_to_chunk` for a more detailed documentation) +/// +/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing. /// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) +/// * `max_buffer_size`: How big `buffer` can be. /// * `carry_over`: The bytes that must be carried over in between invocations. /// * `file`: The current file. /// * `next_files`: What `file` should be updated to next. @@ -69,6 +76,7 @@ impl Chunk { pub fn read( sender_option: &mut Option>, mut buffer: Vec, + max_buffer_size: Option, carry_over: &mut Vec, file: &mut Box, next_files: &mut impl Iterator>, @@ -82,8 +90,14 @@ pub fn read( buffer.resize(carry_over.len() + 10 * 1024, 0); } buffer[..carry_over.len()].copy_from_slice(&carry_over); - let (read, should_continue) = - read_to_buffer(file, next_files, &mut buffer, carry_over.len(), separator); + let (read, should_continue) = read_to_buffer( + file, + next_files, + &mut buffer, + max_buffer_size, + carry_over.len(), + separator, + ); carry_over.clear(); carry_over.extend_from_slice(&buffer[read..]); @@ -138,7 +152,8 @@ fn parse_lines<'a>( /// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`, /// and this function continues reading. /// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset` -/// as well). It will not be grown by default, unless that is necessary to read at least two lines. +/// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines. +/// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines. /// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over /// from the previous read and should not be overwritten. /// * `separator`: The byte that separates lines. @@ -153,6 +168,7 @@ fn read_to_buffer( file: &mut Box, next_files: &mut impl Iterator>, buffer: &mut Vec, + max_buffer_size: Option, start_offset: usize, separator: u8, ) -> (usize, bool) { @@ -162,6 +178,19 @@ fn read_to_buffer( Ok(0) => { if read_target.is_empty() { // chunk is full + if let Some(max_buffer_size) = max_buffer_size { + if max_buffer_size > buffer.len() { + // we can grow the buffer + let prev_len = buffer.len(); + if buffer.len() < max_buffer_size / 2 { + buffer.resize(buffer.len() * 2, 0); + } else { + buffer.resize(max_buffer_size, 0); + } + read_target = &mut buffer[prev_len..]; + continue; + } + } let mut sep_iter = memchr_iter(separator, &buffer).rev(); let last_line_end = sep_iter.next(); if sep_iter.next().is_some() { diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 629ebb714..a304bf7c0 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -5,12 +5,13 @@ // * For the full copyright and license information, please view the LICENSE // * file that was distributed with this source code. -//! Sort big files by using files for storing intermediate chunks. +//! Sort big files by using auxiliary files for storing intermediate chunks. //! //! Files are read into chunks of memory which are then sorted individually and //! written to temporary files. There are two threads: One sorter, and one reader/writer. //! The buffers for the individual chunks are recycled. There are two buffers. +use std::cmp::Ordering; use std::io::{BufWriter, Write}; use std::path::Path; use std::{ @@ -20,30 +21,19 @@ use std::{ thread, }; +use itertools::Itertools; + use tempdir::TempDir; use crate::{ chunks::{self, Chunk}, - merge::{self, FileMerger}, - sort_by, GlobalSettings, + compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, }; -/// Iterator that wraps the -pub struct ExtSortedMerger<'a> { - pub file_merger: FileMerger<'a>, - // Keep _tmp_dir around, as it is deleted when dropped. - _tmp_dir: TempDir, -} +const MIN_BUFFER_SIZE: usize = 8_000; -/// Sort big files by using files for storing intermediate chunks. -/// -/// # Returns -/// -/// An iterator that merges intermediate files back together. -pub fn ext_sort<'a>( - files: &mut impl Iterator>, - settings: &'a GlobalSettings, -) -> ExtSortedMerger<'a> { +/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. +pub fn ext_sort(files: &mut impl Iterator>, settings: &GlobalSettings) { let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); @@ -51,7 +41,7 @@ pub fn ext_sort<'a>( let settings = settings.clone(); move || sorter(recycled_receiver, sorted_sender, settings) }); - let chunks_read = reader_writer( + let read_result = reader_writer( files, &tmp_dir, if settings.zero_terminated { @@ -66,13 +56,29 @@ pub fn ext_sort<'a>( sorted_receiver, recycled_sender, ); - let files = (0..chunks_read) - .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) - .collect::>(); - - ExtSortedMerger { - file_merger: merge::merge(&files, settings), - _tmp_dir: tmp_dir, + match read_result { + ReadResult::WroteChunksToFile { chunks_written } => { + let files = (0..chunks_written) + .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) + .collect::>(); + let mut merger = merge::merge(&files, settings); + merger.write_all(settings); + } + ReadResult::SortedSingleChunk(chunk) => { + output_sorted_lines(chunk.borrow_lines().iter(), settings); + } + ReadResult::SortedTwoChunks([a, b]) => { + let merged_iter = a + .borrow_lines() + .iter() + .merge_by(b.borrow_lines().iter(), |line_a, line_b| { + compare_by(line_a, line_b, settings) != Ordering::Greater + }); + output_sorted_lines(merged_iter, settings); + } + ReadResult::EmptyInput => { + // don't output anything + } } } @@ -84,6 +90,21 @@ fn sorter(receiver: Receiver, sender: SyncSender, settings: Global } } +/// Describes how we read the chunks from the input. +enum ReadResult { + /// The input was empty. Nothing was read. + EmptyInput, + /// The input fits into a single Chunk, which was kept in memory. + SortedSingleChunk(Chunk), + /// The input fits into two chunks, which were kept in memory. + SortedTwoChunks([Chunk; 2]), + /// The input was read into multiple chunks, which were written to auxiliary files. + WroteChunksToFile { + /// The number of chunks written to auxiliary files. + chunks_written: usize, + }, +} + /// The function that is executed on the reader/writer thread. /// /// # Returns @@ -96,7 +117,7 @@ fn reader_writer( settings: GlobalSettings, receiver: Receiver, sender: SyncSender, -) -> usize { +) -> ReadResult { let mut sender_option = Some(sender); let mut file = files.next().unwrap(); @@ -106,21 +127,40 @@ fn reader_writer( for _ in 0..2 { chunks::read( &mut sender_option, - vec![0; buffer_size], + vec![0; MIN_BUFFER_SIZE], + Some(buffer_size), &mut carry_over, &mut file, &mut files, separator, Vec::new(), &settings, - ) + ); + if sender_option.is_none() { + // We have already read the whole input. Since we are in our first two reads, + // this means that we can fit the whole input into memory. Bypass writing below and + // handle this case in a more straightforward way. + return if let Ok(first_chunk) = receiver.recv() { + if let Ok(second_chunk) = receiver.recv() { + ReadResult::SortedTwoChunks([first_chunk, second_chunk]) + } else { + ReadResult::SortedSingleChunk(first_chunk) + } + } else { + ReadResult::EmptyInput + }; + } } let mut file_number = 0; loop { let mut chunk = match receiver.recv() { Ok(it) => it, - _ => return file_number, + _ => { + return ReadResult::WroteChunksToFile { + chunks_written: file_number, + } + } }; write( @@ -129,13 +169,14 @@ fn reader_writer( separator, ); - let (recycled_lines, recycled_buffer) = chunk.recycle(); - file_number += 1; + let (recycled_lines, recycled_buffer) = chunk.recycle(); + chunks::read( &mut sender_option, recycled_buffer, + None, &mut carry_over, &mut file, &mut files, diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 6f7cdfed7..48d48ad40 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -108,6 +108,7 @@ fn reader( chunks::read( sender, recycled_buffer, + None, carry_over, file, &mut iter::empty(), diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index b6ab5a2b1..78388a298 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -93,7 +93,10 @@ static THOUSANDS_SEP: char = ','; static NEGATIVE: char = '-'; static POSITIVE: char = '+'; -static DEFAULT_BUF_SIZE: usize = std::usize::MAX; +/// Choosing a higher buffer size does not result in performance improvements +/// (at least not on my machine). TODO: In the future, we should also take the amount of +/// available memory into consideration, instead of relying on this constant only. +static DEFAULT_BUF_SIZE: usize = 1_000_000_000; #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)] enum SortMode { @@ -127,7 +130,6 @@ pub struct GlobalSettings { zero_terminated: bool, buffer_size: usize, tmp_dir: PathBuf, - ext_sort: bool, } impl GlobalSettings { @@ -189,7 +191,6 @@ impl Default for GlobalSettings { zero_terminated: false, buffer_size: DEFAULT_BUF_SIZE, tmp_dir: PathBuf::new(), - ext_sort: false, } } } @@ -941,28 +942,15 @@ pub fn uumain(args: impl uucore::Args) -> i32 { env::set_var("RAYON_NUM_THREADS", &settings.threads); } - if matches.is_present(OPT_BUF_SIZE) { - settings.buffer_size = { - let input = matches - .value_of(OPT_BUF_SIZE) - .map(String::from) - .unwrap_or(format!("{}", DEFAULT_BUF_SIZE)); + settings.buffer_size = matches + .value_of(OPT_BUF_SIZE) + .map(GlobalSettings::human_numeric_convert) + .unwrap_or(DEFAULT_BUF_SIZE); - GlobalSettings::human_numeric_convert(&input) - }; - settings.ext_sort = true; - } - - if matches.is_present(OPT_TMP_DIR) { - let result = matches - .value_of(OPT_TMP_DIR) - .map(String::from) - .unwrap_or(format!("{}", env::temp_dir().display())); - settings.tmp_dir = PathBuf::from(result); - settings.ext_sort = true; - } else { - settings.tmp_dir = env::temp_dir(); - } + settings.tmp_dir = matches + .value_of(OPT_TMP_DIR) + .map(PathBuf::from) + .unwrap_or_else(env::temp_dir); settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.merge = matches.is_present(OPT_MERGE); @@ -1047,7 +1035,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { exec(&files, &settings) } -fn output_sorted_lines<'a>(iter: impl Iterator>, settings: &GlobalSettings) { +fn output_sorted_lines<'a>(iter: impl Iterator>, settings: &GlobalSettings) { if settings.unique { print_sorted( iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal), @@ -1067,34 +1055,10 @@ fn exec(files: &[String], settings: &GlobalSettings) -> i32 { crash!(1, "only one file allowed with -c"); } return check::check(files.first().unwrap(), settings); - } else if settings.ext_sort { + } else { let mut lines = files.iter().filter_map(open); - let mut sorted = ext_sort(&mut lines, &settings); - sorted.file_merger.write_all(settings); - } else { - let separator = if settings.zero_terminated { '\0' } else { '\n' }; - let mut lines = vec![]; - let mut full_string = String::new(); - - for mut file in files.iter().filter_map(open) { - crash_if_err!(1, file.read_to_string(&mut full_string)); - - if !full_string.ends_with(separator) { - full_string.push(separator); - } - } - - if full_string.ends_with(separator) { - full_string.pop(); - } - - for line in full_string.split(if settings.zero_terminated { '\0' } else { '\n' }) { - lines.push(Line::create(line, &settings)); - } - - sort_by(&mut lines, &settings); - output_sorted_lines(lines.into_iter(), &settings); + ext_sort(&mut lines, &settings); } 0 } @@ -1366,7 +1330,7 @@ fn version_compare(a: &str, b: &str) -> Ordering { } } -fn print_sorted<'a, T: Iterator>>(iter: T, settings: &GlobalSettings) { +fn print_sorted<'a, T: Iterator>>(iter: T, settings: &GlobalSettings) { let mut writer = settings.out_writer(); for line in iter { line.print(&mut writer, settings); diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index e89d18054..59058d5bc 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -15,29 +15,18 @@ fn test_helper(file_name: &str, args: &str) { .stdout_is_fixture(format!("{}.expected.debug", file_name)); } -// FYI, the initialization size of our Line struct is 96 bytes. -// -// At very small buffer sizes, with that overhead we are certainly going -// to overrun our buffer way, way, way too quickly because of these excess -// bytes for the struct. -// -// For instance, seq 0..20000 > ...text = 108894 bytes -// But overhead is 1920000 + 108894 = 2028894 bytes -// -// Or kjvbible-random.txt = 4332506 bytes, but minimum size of its -// 99817 lines in memory * 96 bytes = 9582432 bytes -// -// Here, we test 108894 bytes with a 50K buffer -// #[test] -fn test_larger_than_specified_segment() { - new_ucmd!() - .arg("-n") - .arg("-S") - .arg("50K") - .arg("ext_sort.txt") - .succeeds() - .stdout_is_fixture("ext_sort.expected"); +fn test_buffer_sizes() { + let buffer_sizes = ["0", "50K", "1M", "1000G"]; + for buffer_size in &buffer_sizes { + new_ucmd!() + .arg("-n") + .arg("-S") + .arg(buffer_size) + .arg("ext_sort.txt") + .succeeds() + .stdout_is_fixture("ext_sort.expected"); + } } #[test] From 088443276a6d0229dc93b5a9502ff3d09870d782 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sat, 22 May 2021 14:00:07 +0200 Subject: [PATCH 2/2] sort: improve handling of buffer size cmd arg Instead of overflowing when calculating the buffer size, use saturating_{pow, mul}. When failing to parse the buffer size, we now crash instead of silently ignoring the error. --- src/uu/sort/src/sort.rs | 54 ++++++++++++++++++++++---------------- tests/by-util/test_sort.rs | 24 ++++++++++------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 78388a298..bc3b65492 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -93,10 +93,10 @@ static THOUSANDS_SEP: char = ','; static NEGATIVE: char = '-'; static POSITIVE: char = '+'; -/// Choosing a higher buffer size does not result in performance improvements -/// (at least not on my machine). TODO: In the future, we should also take the amount of -/// available memory into consideration, instead of relying on this constant only. -static DEFAULT_BUF_SIZE: usize = 1_000_000_000; +// Choosing a higher buffer size does not result in performance improvements +// (at least not on my machine). TODO: In the future, we should also take the amount of +// available memory into consideration, instead of relying on this constant only. +static DEFAULT_BUF_SIZE: usize = 1_000_000_000; // 1 GB #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)] enum SortMode { @@ -133,24 +133,32 @@ pub struct GlobalSettings { } impl GlobalSettings { - // It's back to do conversions for command line opts! - // Probably want to do through numstrcmp somehow now? - fn human_numeric_convert(a: &str) -> usize { - let num_str = &a[get_leading_gen(a)]; - let (_, suf_str) = a.split_at(num_str.len()); - let num_usize = num_str - .parse::() - .expect("Error parsing buffer size: "); - let suf_usize: usize = match suf_str.to_uppercase().as_str() { - // SI Units - "B" => 1usize, - "K" => 1000usize, - "M" => 1000000usize, - "G" => 1000000000usize, - // GNU regards empty human numeric values as K by default - _ => 1000usize, - }; - num_usize * suf_usize + /// Interpret this `&str` as a number with an optional trailing si unit. + /// + /// If there is no trailing si unit, the implicit unit is K. + /// The suffix B causes the number to be interpreted as a byte count. + fn parse_byte_count(input: &str) -> usize { + const SI_UNITS: &[char] = &['B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']; + + let input = input.trim(); + + let (num_str, si_unit) = + if input.ends_with(|c: char| SI_UNITS.contains(&c.to_ascii_uppercase())) { + let mut chars = input.chars(); + let si_suffix = chars.next_back().unwrap().to_ascii_uppercase(); + let si_unit = SI_UNITS.iter().position(|&c| c == si_suffix).unwrap(); + let num_str = chars.as_str(); + (num_str, si_unit) + } else { + (input, 1) + }; + + let num_usize: usize = num_str + .trim() + .parse() + .unwrap_or_else(|e| crash!(1, "failed to parse buffer size `{}`: {}", num_str, e)); + + num_usize.saturating_mul(1000usize.saturating_pow(si_unit as u32)) } fn out_writer(&self) -> BufWriter> { @@ -944,7 +952,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { settings.buffer_size = matches .value_of(OPT_BUF_SIZE) - .map(GlobalSettings::human_numeric_convert) + .map(GlobalSettings::parse_byte_count) .unwrap_or(DEFAULT_BUF_SIZE); settings.tmp_dir = matches diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index 59058d5bc..23705d2ee 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -17,7 +17,9 @@ fn test_helper(file_name: &str, args: &str) { #[test] fn test_buffer_sizes() { - let buffer_sizes = ["0", "50K", "1M", "1000G"]; + let buffer_sizes = [ + "0", "50K", "50k", "1M", "100M", "1000G", "10T", "500E", "1Y", + ]; for buffer_size in &buffer_sizes { new_ucmd!() .arg("-n") @@ -30,14 +32,18 @@ fn test_buffer_sizes() { } #[test] -fn test_smaller_than_specified_segment() { - new_ucmd!() - .arg("-n") - .arg("-S") - .arg("100M") - .arg("ext_sort.txt") - .succeeds() - .stdout_is_fixture("ext_sort.expected"); +fn test_invalid_buffer_size() { + let buffer_sizes = ["asd", "100f"]; + for invalid_buffer_size in &buffer_sizes { + new_ucmd!() + .arg("-S") + .arg(invalid_buffer_size) + .fails() + .stderr_only(format!( + "sort: error: failed to parse buffer size `{}`: invalid digit found in string", + invalid_buffer_size + )); + } } #[test]