1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 11:37:44 +00:00

Merge pull request #2246 from miDeb/sort-automatic-extsort

sort: automatically fall back to extsort
This commit is contained in:
Sylvestre Ledru 2021-05-22 17:21:02 +02:00 committed by GitHub
commit 542deb8888
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 178 additions and 138 deletions

View file

@ -72,7 +72,8 @@ Run `cargo build --release` before benchmarking after you make a change!
## External sorting ## External sorting
Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting
huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` huge files (ideally multiple Gigabytes) with `-S` (or without `-S` to benchmark with our default value).
Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt`
multiple times (this will add the contents of `shuffled_wordlist.txt` to itself). multiple times (this will add the contents of `shuffled_wordlist.txt` to itself).
Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'` Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'`

View file

@ -87,6 +87,7 @@ fn reader(
chunks::read( chunks::read(
&mut sender, &mut sender,
recycled_buffer, recycled_buffer,
None,
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut iter::empty(), &mut iter::empty(),

View file

@ -52,13 +52,20 @@ impl Chunk {
/// Read a chunk, parse lines and send them. /// Read a chunk, parse lines and send them.
/// ///
/// No empty chunk will be sent. /// No empty chunk will be sent. If we reach the end of the input, sender_option
/// is set to None. If this function however does not set sender_option to None,
/// it is not guaranteed that there is still input left: If the input fits _exactly_
/// into a buffer, we will only notice that there's nothing more to read at the next
/// invocation.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, does nothing. /// (see also `read_to_chunk` for a more detailed documentation)
///
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing.
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled.
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
/// * `max_buffer_size`: How big `buffer` can be.
/// * `carry_over`: The bytes that must be carried over in between invocations. /// * `carry_over`: The bytes that must be carried over in between invocations.
/// * `file`: The current file. /// * `file`: The current file.
/// * `next_files`: What `file` should be updated to next. /// * `next_files`: What `file` should be updated to next.
@ -69,6 +76,7 @@ impl Chunk {
pub fn read( pub fn read(
sender_option: &mut Option<SyncSender<Chunk>>, sender_option: &mut Option<SyncSender<Chunk>>,
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
max_buffer_size: Option<usize>,
carry_over: &mut Vec<u8>, carry_over: &mut Vec<u8>,
file: &mut Box<dyn Read + Send>, file: &mut Box<dyn Read + Send>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>, next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
@ -82,8 +90,14 @@ pub fn read(
buffer.resize(carry_over.len() + 10 * 1024, 0); buffer.resize(carry_over.len() + 10 * 1024, 0);
} }
buffer[..carry_over.len()].copy_from_slice(&carry_over); buffer[..carry_over.len()].copy_from_slice(&carry_over);
let (read, should_continue) = let (read, should_continue) = read_to_buffer(
read_to_buffer(file, next_files, &mut buffer, carry_over.len(), separator); file,
next_files,
&mut buffer,
max_buffer_size,
carry_over.len(),
separator,
);
carry_over.clear(); carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]); carry_over.extend_from_slice(&buffer[read..]);
@ -138,7 +152,8 @@ fn parse_lines<'a>(
/// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`, /// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`,
/// and this function continues reading. /// and this function continues reading.
/// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset` /// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset`
/// as well). It will not be grown by default, unless that is necessary to read at least two lines. /// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines.
/// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines.
/// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over /// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over
/// from the previous read and should not be overwritten. /// from the previous read and should not be overwritten.
/// * `separator`: The byte that separates lines. /// * `separator`: The byte that separates lines.
@ -153,6 +168,7 @@ fn read_to_buffer(
file: &mut Box<dyn Read + Send>, file: &mut Box<dyn Read + Send>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>, next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
max_buffer_size: Option<usize>,
start_offset: usize, start_offset: usize,
separator: u8, separator: u8,
) -> (usize, bool) { ) -> (usize, bool) {
@ -162,6 +178,19 @@ fn read_to_buffer(
Ok(0) => { Ok(0) => {
if read_target.is_empty() { if read_target.is_empty() {
// chunk is full // chunk is full
if let Some(max_buffer_size) = max_buffer_size {
if max_buffer_size > buffer.len() {
// we can grow the buffer
let prev_len = buffer.len();
if buffer.len() < max_buffer_size / 2 {
buffer.resize(buffer.len() * 2, 0);
} else {
buffer.resize(max_buffer_size, 0);
}
read_target = &mut buffer[prev_len..];
continue;
}
}
let mut sep_iter = memchr_iter(separator, &buffer).rev(); let mut sep_iter = memchr_iter(separator, &buffer).rev();
let last_line_end = sep_iter.next(); let last_line_end = sep_iter.next();
if sep_iter.next().is_some() { if sep_iter.next().is_some() {

View file

@ -5,12 +5,13 @@
// * For the full copyright and license information, please view the LICENSE // * For the full copyright and license information, please view the LICENSE
// * file that was distributed with this source code. // * file that was distributed with this source code.
//! Sort big files by using files for storing intermediate chunks. //! Sort big files by using auxiliary files for storing intermediate chunks.
//! //!
//! Files are read into chunks of memory which are then sorted individually and //! Files are read into chunks of memory which are then sorted individually and
//! written to temporary files. There are two threads: One sorter, and one reader/writer. //! written to temporary files. There are two threads: One sorter, and one reader/writer.
//! The buffers for the individual chunks are recycled. There are two buffers. //! The buffers for the individual chunks are recycled. There are two buffers.
use std::cmp::Ordering;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::path::Path; use std::path::Path;
use std::{ use std::{
@ -20,30 +21,19 @@ use std::{
thread, thread,
}; };
use itertools::Itertools;
use tempdir::TempDir; use tempdir::TempDir;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
merge::{self, FileMerger}, compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
sort_by, GlobalSettings,
}; };
/// Iterator that wraps the const MIN_BUFFER_SIZE: usize = 8_000;
pub struct ExtSortedMerger<'a> {
pub file_merger: FileMerger<'a>,
// Keep _tmp_dir around, as it is deleted when dropped.
_tmp_dir: TempDir,
}
/// Sort big files by using files for storing intermediate chunks. /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
/// pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings: &GlobalSettings) {
/// # Returns
///
/// An iterator that merges intermediate files back together.
pub fn ext_sort<'a>(
files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
settings: &'a GlobalSettings,
) -> ExtSortedMerger<'a> {
let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort"));
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
@ -51,7 +41,7 @@ pub fn ext_sort<'a>(
let settings = settings.clone(); let settings = settings.clone();
move || sorter(recycled_receiver, sorted_sender, settings) move || sorter(recycled_receiver, sorted_sender, settings)
}); });
let chunks_read = reader_writer( let read_result = reader_writer(
files, files,
&tmp_dir, &tmp_dir,
if settings.zero_terminated { if settings.zero_terminated {
@ -66,13 +56,29 @@ pub fn ext_sort<'a>(
sorted_receiver, sorted_receiver,
recycled_sender, recycled_sender,
); );
let files = (0..chunks_read) match read_result {
ReadResult::WroteChunksToFile { chunks_written } => {
let files = (0..chunks_written)
.map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut merger = merge::merge(&files, settings);
ExtSortedMerger { merger.write_all(settings);
file_merger: merge::merge(&files, settings), }
_tmp_dir: tmp_dir, ReadResult::SortedSingleChunk(chunk) => {
output_sorted_lines(chunk.borrow_lines().iter(), settings);
}
ReadResult::SortedTwoChunks([a, b]) => {
let merged_iter = a
.borrow_lines()
.iter()
.merge_by(b.borrow_lines().iter(), |line_a, line_b| {
compare_by(line_a, line_b, settings) != Ordering::Greater
});
output_sorted_lines(merged_iter, settings);
}
ReadResult::EmptyInput => {
// don't output anything
}
} }
} }
@ -84,6 +90,21 @@ fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: Global
} }
} }
/// Describes how we read the chunks from the input.
enum ReadResult {
/// The input was empty. Nothing was read.
EmptyInput,
/// The input fits into a single Chunk, which was kept in memory.
SortedSingleChunk(Chunk),
/// The input fits into two chunks, which were kept in memory.
SortedTwoChunks([Chunk; 2]),
/// The input was read into multiple chunks, which were written to auxiliary files.
WroteChunksToFile {
/// The number of chunks written to auxiliary files.
chunks_written: usize,
},
}
/// The function that is executed on the reader/writer thread. /// The function that is executed on the reader/writer thread.
/// ///
/// # Returns /// # Returns
@ -96,7 +117,7 @@ fn reader_writer(
settings: GlobalSettings, settings: GlobalSettings,
receiver: Receiver<Chunk>, receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>, sender: SyncSender<Chunk>,
) -> usize { ) -> ReadResult {
let mut sender_option = Some(sender); let mut sender_option = Some(sender);
let mut file = files.next().unwrap(); let mut file = files.next().unwrap();
@ -106,21 +127,40 @@ fn reader_writer(
for _ in 0..2 { for _ in 0..2 {
chunks::read( chunks::read(
&mut sender_option, &mut sender_option,
vec![0; buffer_size], vec![0; MIN_BUFFER_SIZE],
Some(buffer_size),
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut files, &mut files,
separator, separator,
Vec::new(), Vec::new(),
&settings, &settings,
) );
if sender_option.is_none() {
// We have already read the whole input. Since we are in our first two reads,
// this means that we can fit the whole input into memory. Bypass writing below and
// handle this case in a more straightforward way.
return if let Ok(first_chunk) = receiver.recv() {
if let Ok(second_chunk) = receiver.recv() {
ReadResult::SortedTwoChunks([first_chunk, second_chunk])
} else {
ReadResult::SortedSingleChunk(first_chunk)
}
} else {
ReadResult::EmptyInput
};
}
} }
let mut file_number = 0; let mut file_number = 0;
loop { loop {
let mut chunk = match receiver.recv() { let mut chunk = match receiver.recv() {
Ok(it) => it, Ok(it) => it,
_ => return file_number, _ => {
return ReadResult::WroteChunksToFile {
chunks_written: file_number,
}
}
}; };
write( write(
@ -129,13 +169,14 @@ fn reader_writer(
separator, separator,
); );
let (recycled_lines, recycled_buffer) = chunk.recycle();
file_number += 1; file_number += 1;
let (recycled_lines, recycled_buffer) = chunk.recycle();
chunks::read( chunks::read(
&mut sender_option, &mut sender_option,
recycled_buffer, recycled_buffer,
None,
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut files, &mut files,

View file

@ -108,6 +108,7 @@ fn reader(
chunks::read( chunks::read(
sender, sender,
recycled_buffer, recycled_buffer,
None,
carry_over, carry_over,
file, file,
&mut iter::empty(), &mut iter::empty(),

View file

@ -93,7 +93,10 @@ static THOUSANDS_SEP: char = ',';
static NEGATIVE: char = '-'; static NEGATIVE: char = '-';
static POSITIVE: char = '+'; static POSITIVE: char = '+';
static DEFAULT_BUF_SIZE: usize = std::usize::MAX; // Choosing a higher buffer size does not result in performance improvements
// (at least not on my machine). TODO: In the future, we should also take the amount of
// available memory into consideration, instead of relying on this constant only.
static DEFAULT_BUF_SIZE: usize = 1_000_000_000; // 1 GB
#[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)] #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)]
enum SortMode { enum SortMode {
@ -127,28 +130,35 @@ pub struct GlobalSettings {
zero_terminated: bool, zero_terminated: bool,
buffer_size: usize, buffer_size: usize,
tmp_dir: PathBuf, tmp_dir: PathBuf,
ext_sort: bool,
} }
impl GlobalSettings { impl GlobalSettings {
// It's back to do conversions for command line opts! /// Interpret this `&str` as a number with an optional trailing si unit.
// Probably want to do through numstrcmp somehow now? ///
fn human_numeric_convert(a: &str) -> usize { /// If there is no trailing si unit, the implicit unit is K.
let num_str = &a[get_leading_gen(a)]; /// The suffix B causes the number to be interpreted as a byte count.
let (_, suf_str) = a.split_at(num_str.len()); fn parse_byte_count(input: &str) -> usize {
let num_usize = num_str const SI_UNITS: &[char] = &['B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'];
.parse::<usize>()
.expect("Error parsing buffer size: "); let input = input.trim();
let suf_usize: usize = match suf_str.to_uppercase().as_str() {
// SI Units let (num_str, si_unit) =
"B" => 1usize, if input.ends_with(|c: char| SI_UNITS.contains(&c.to_ascii_uppercase())) {
"K" => 1000usize, let mut chars = input.chars();
"M" => 1000000usize, let si_suffix = chars.next_back().unwrap().to_ascii_uppercase();
"G" => 1000000000usize, let si_unit = SI_UNITS.iter().position(|&c| c == si_suffix).unwrap();
// GNU regards empty human numeric values as K by default let num_str = chars.as_str();
_ => 1000usize, (num_str, si_unit)
} else {
(input, 1)
}; };
num_usize * suf_usize
let num_usize: usize = num_str
.trim()
.parse()
.unwrap_or_else(|e| crash!(1, "failed to parse buffer size `{}`: {}", num_str, e));
num_usize.saturating_mul(1000usize.saturating_pow(si_unit as u32))
} }
fn out_writer(&self) -> BufWriter<Box<dyn Write>> { fn out_writer(&self) -> BufWriter<Box<dyn Write>> {
@ -189,7 +199,6 @@ impl Default for GlobalSettings {
zero_terminated: false, zero_terminated: false,
buffer_size: DEFAULT_BUF_SIZE, buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(), tmp_dir: PathBuf::new(),
ext_sort: false,
} }
} }
} }
@ -941,28 +950,15 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
env::set_var("RAYON_NUM_THREADS", &settings.threads); env::set_var("RAYON_NUM_THREADS", &settings.threads);
} }
if matches.is_present(OPT_BUF_SIZE) { settings.buffer_size = matches
settings.buffer_size = {
let input = matches
.value_of(OPT_BUF_SIZE) .value_of(OPT_BUF_SIZE)
.map(String::from) .map(GlobalSettings::parse_byte_count)
.unwrap_or(format!("{}", DEFAULT_BUF_SIZE)); .unwrap_or(DEFAULT_BUF_SIZE);
GlobalSettings::human_numeric_convert(&input) settings.tmp_dir = matches
};
settings.ext_sort = true;
}
if matches.is_present(OPT_TMP_DIR) {
let result = matches
.value_of(OPT_TMP_DIR) .value_of(OPT_TMP_DIR)
.map(String::from) .map(PathBuf::from)
.unwrap_or(format!("{}", env::temp_dir().display())); .unwrap_or_else(env::temp_dir);
settings.tmp_dir = PathBuf::from(result);
settings.ext_sort = true;
} else {
settings.tmp_dir = env::temp_dir();
}
settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED);
settings.merge = matches.is_present(OPT_MERGE); settings.merge = matches.is_present(OPT_MERGE);
@ -1047,7 +1043,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
exec(&files, &settings) exec(&files, &settings)
} }
fn output_sorted_lines<'a>(iter: impl Iterator<Item = Line<'a>>, settings: &GlobalSettings) { fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &GlobalSettings) {
if settings.unique { if settings.unique {
print_sorted( print_sorted(
iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal), iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
@ -1067,34 +1063,10 @@ fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
crash!(1, "only one file allowed with -c"); crash!(1, "only one file allowed with -c");
} }
return check::check(files.first().unwrap(), settings); return check::check(files.first().unwrap(), settings);
} else if settings.ext_sort { } else {
let mut lines = files.iter().filter_map(open); let mut lines = files.iter().filter_map(open);
let mut sorted = ext_sort(&mut lines, &settings); ext_sort(&mut lines, &settings);
sorted.file_merger.write_all(settings);
} else {
let separator = if settings.zero_terminated { '\0' } else { '\n' };
let mut lines = vec![];
let mut full_string = String::new();
for mut file in files.iter().filter_map(open) {
crash_if_err!(1, file.read_to_string(&mut full_string));
if !full_string.ends_with(separator) {
full_string.push(separator);
}
}
if full_string.ends_with(separator) {
full_string.pop();
}
for line in full_string.split(if settings.zero_terminated { '\0' } else { '\n' }) {
lines.push(Line::create(line, &settings));
}
sort_by(&mut lines, &settings);
output_sorted_lines(lines.into_iter(), &settings);
} }
0 0
} }
@ -1366,7 +1338,7 @@ fn version_compare(a: &str, b: &str) -> Ordering {
} }
} }
fn print_sorted<'a, T: Iterator<Item = Line<'a>>>(iter: T, settings: &GlobalSettings) { fn print_sorted<'a, T: Iterator<Item = &'a Line<'a>>>(iter: T, settings: &GlobalSettings) {
let mut writer = settings.out_writer(); let mut writer = settings.out_writer();
for line in iter { for line in iter {
line.print(&mut writer, settings); line.print(&mut writer, settings);

View file

@ -15,40 +15,35 @@ fn test_helper(file_name: &str, args: &str) {
.stdout_is_fixture(format!("{}.expected.debug", file_name)); .stdout_is_fixture(format!("{}.expected.debug", file_name));
} }
// FYI, the initialization size of our Line struct is 96 bytes.
//
// At very small buffer sizes, with that overhead we are certainly going
// to overrun our buffer way, way, way too quickly because of these excess
// bytes for the struct.
//
// For instance, seq 0..20000 > ...text = 108894 bytes
// But overhead is 1920000 + 108894 = 2028894 bytes
//
// Or kjvbible-random.txt = 4332506 bytes, but minimum size of its
// 99817 lines in memory * 96 bytes = 9582432 bytes
//
// Here, we test 108894 bytes with a 50K buffer
//
#[test] #[test]
fn test_larger_than_specified_segment() { fn test_buffer_sizes() {
let buffer_sizes = [
"0", "50K", "50k", "1M", "100M", "1000G", "10T", "500E", "1Y",
];
for buffer_size in &buffer_sizes {
new_ucmd!() new_ucmd!()
.arg("-n") .arg("-n")
.arg("-S") .arg("-S")
.arg("50K") .arg(buffer_size)
.arg("ext_sort.txt") .arg("ext_sort.txt")
.succeeds() .succeeds()
.stdout_is_fixture("ext_sort.expected"); .stdout_is_fixture("ext_sort.expected");
}
} }
#[test] #[test]
fn test_smaller_than_specified_segment() { fn test_invalid_buffer_size() {
let buffer_sizes = ["asd", "100f"];
for invalid_buffer_size in &buffer_sizes {
new_ucmd!() new_ucmd!()
.arg("-n")
.arg("-S") .arg("-S")
.arg("100M") .arg(invalid_buffer_size)
.arg("ext_sort.txt") .fails()
.succeeds() .stderr_only(format!(
.stdout_is_fixture("ext_sort.expected"); "sort: error: failed to parse buffer size `{}`: invalid digit found in string",
invalid_buffer_size
));
}
} }
#[test] #[test]