1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-09-15 11:36:16 +00:00

sort: automatically fall back to extsort

To make this work we make default sort a special case of external sort.

External sorting uses auxiliary files for intermediate chunks. However,
when we can keep our intermediate chunks in memory, we don't write them
to the file system at all. Only when we notice that we can't keep them
in memory they are written to the disk.

Additionally, we don't allocate buffers with the capacity of their
maximum size anymore. Instead, they start with a capacity of 8kb and are
grown only when needed.

This makes sorting smaller files about as fast as it was before
(I'm seeing a regression of ~3%), and allows us to seamlessly continue
with auxiliary files when needed.
This commit is contained in:
Michael Debertol 2021-05-21 23:00:13 +02:00
parent df45b20dc1
commit e7da8058dc
7 changed files with 138 additions and 112 deletions

View file

@ -72,7 +72,8 @@ Run `cargo build --release` before benchmarking after you make a change!
## External sorting ## External sorting
Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting Try running commands with the `-S` option set to an amount of memory to be used, such as `1M`. Additionally, you could try sorting
huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` huge files (ideally multiple Gigabytes) with `-S` (or without `-S` to benchmark with our default value).
Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt`
multiple times (this will add the contents of `shuffled_wordlist.txt` to itself). multiple times (this will add the contents of `shuffled_wordlist.txt` to itself).
Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'` Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'`

View file

@ -87,6 +87,7 @@ fn reader(
chunks::read( chunks::read(
&mut sender, &mut sender,
recycled_buffer, recycled_buffer,
None,
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut iter::empty(), &mut iter::empty(),

View file

@ -52,13 +52,20 @@ impl Chunk {
/// Read a chunk, parse lines and send them. /// Read a chunk, parse lines and send them.
/// ///
/// No empty chunk will be sent. /// No empty chunk will be sent. If we reach the end of the input, sender_option
/// is set to None. If this function however does not set sender_option to None,
/// it is not guaranteed that there is still input left: If the input fits _exactly_
/// into a buffer, we will only notice that there's nothing more to read at the next
/// invocation.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, does nothing. /// (see also `read_to_chunk` for a more detailed documentation)
///
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing.
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled.
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
/// * `max_buffer_size`: How big `buffer` can be.
/// * `carry_over`: The bytes that must be carried over in between invocations. /// * `carry_over`: The bytes that must be carried over in between invocations.
/// * `file`: The current file. /// * `file`: The current file.
/// * `next_files`: What `file` should be updated to next. /// * `next_files`: What `file` should be updated to next.
@ -69,6 +76,7 @@ impl Chunk {
pub fn read( pub fn read(
sender_option: &mut Option<SyncSender<Chunk>>, sender_option: &mut Option<SyncSender<Chunk>>,
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
max_buffer_size: Option<usize>,
carry_over: &mut Vec<u8>, carry_over: &mut Vec<u8>,
file: &mut Box<dyn Read + Send>, file: &mut Box<dyn Read + Send>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>, next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
@ -82,8 +90,14 @@ pub fn read(
buffer.resize(carry_over.len() + 10 * 1024, 0); buffer.resize(carry_over.len() + 10 * 1024, 0);
} }
buffer[..carry_over.len()].copy_from_slice(&carry_over); buffer[..carry_over.len()].copy_from_slice(&carry_over);
let (read, should_continue) = let (read, should_continue) = read_to_buffer(
read_to_buffer(file, next_files, &mut buffer, carry_over.len(), separator); file,
next_files,
&mut buffer,
max_buffer_size,
carry_over.len(),
separator,
);
carry_over.clear(); carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]); carry_over.extend_from_slice(&buffer[read..]);
@ -138,7 +152,8 @@ fn parse_lines<'a>(
/// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`, /// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`,
/// and this function continues reading. /// and this function continues reading.
/// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset` /// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset`
/// as well). It will not be grown by default, unless that is necessary to read at least two lines. /// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines.
/// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines.
/// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over /// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over
/// from the previous read and should not be overwritten. /// from the previous read and should not be overwritten.
/// * `separator`: The byte that separates lines. /// * `separator`: The byte that separates lines.
@ -153,6 +168,7 @@ fn read_to_buffer(
file: &mut Box<dyn Read + Send>, file: &mut Box<dyn Read + Send>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>, next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
max_buffer_size: Option<usize>,
start_offset: usize, start_offset: usize,
separator: u8, separator: u8,
) -> (usize, bool) { ) -> (usize, bool) {
@ -162,6 +178,19 @@ fn read_to_buffer(
Ok(0) => { Ok(0) => {
if read_target.is_empty() { if read_target.is_empty() {
// chunk is full // chunk is full
if let Some(max_buffer_size) = max_buffer_size {
if max_buffer_size > buffer.len() {
// we can grow the buffer
let prev_len = buffer.len();
if buffer.len() < max_buffer_size / 2 {
buffer.resize(buffer.len() * 2, 0);
} else {
buffer.resize(max_buffer_size, 0);
}
read_target = &mut buffer[prev_len..];
continue;
}
}
let mut sep_iter = memchr_iter(separator, &buffer).rev(); let mut sep_iter = memchr_iter(separator, &buffer).rev();
let last_line_end = sep_iter.next(); let last_line_end = sep_iter.next();
if sep_iter.next().is_some() { if sep_iter.next().is_some() {

View file

@ -5,12 +5,13 @@
// * For the full copyright and license information, please view the LICENSE // * For the full copyright and license information, please view the LICENSE
// * file that was distributed with this source code. // * file that was distributed with this source code.
//! Sort big files by using files for storing intermediate chunks. //! Sort big files by using auxiliary files for storing intermediate chunks.
//! //!
//! Files are read into chunks of memory which are then sorted individually and //! Files are read into chunks of memory which are then sorted individually and
//! written to temporary files. There are two threads: One sorter, and one reader/writer. //! written to temporary files. There are two threads: One sorter, and one reader/writer.
//! The buffers for the individual chunks are recycled. There are two buffers. //! The buffers for the individual chunks are recycled. There are two buffers.
use std::cmp::Ordering;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::path::Path; use std::path::Path;
use std::{ use std::{
@ -20,30 +21,19 @@ use std::{
thread, thread,
}; };
use itertools::Itertools;
use tempdir::TempDir; use tempdir::TempDir;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
merge::{self, FileMerger}, compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
sort_by, GlobalSettings,
}; };
/// Iterator that wraps the const MIN_BUFFER_SIZE: usize = 8_000;
pub struct ExtSortedMerger<'a> {
pub file_merger: FileMerger<'a>,
// Keep _tmp_dir around, as it is deleted when dropped.
_tmp_dir: TempDir,
}
/// Sort big files by using files for storing intermediate chunks. /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
/// pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings: &GlobalSettings) {
/// # Returns
///
/// An iterator that merges intermediate files back together.
pub fn ext_sort<'a>(
files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
settings: &'a GlobalSettings,
) -> ExtSortedMerger<'a> {
let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort"));
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
@ -51,7 +41,7 @@ pub fn ext_sort<'a>(
let settings = settings.clone(); let settings = settings.clone();
move || sorter(recycled_receiver, sorted_sender, settings) move || sorter(recycled_receiver, sorted_sender, settings)
}); });
let chunks_read = reader_writer( let read_result = reader_writer(
files, files,
&tmp_dir, &tmp_dir,
if settings.zero_terminated { if settings.zero_terminated {
@ -66,13 +56,29 @@ pub fn ext_sort<'a>(
sorted_receiver, sorted_receiver,
recycled_sender, recycled_sender,
); );
let files = (0..chunks_read) match read_result {
ReadResult::WroteChunksToFile { chunks_written } => {
let files = (0..chunks_written)
.map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut merger = merge::merge(&files, settings);
ExtSortedMerger { merger.write_all(settings);
file_merger: merge::merge(&files, settings), }
_tmp_dir: tmp_dir, ReadResult::SortedSingleChunk(chunk) => {
output_sorted_lines(chunk.borrow_lines().iter(), settings);
}
ReadResult::SortedTwoChunks([a, b]) => {
let merged_iter = a
.borrow_lines()
.iter()
.merge_by(b.borrow_lines().iter(), |line_a, line_b| {
compare_by(line_a, line_b, settings) != Ordering::Greater
});
output_sorted_lines(merged_iter, settings);
}
ReadResult::EmptyInput => {
// don't output anything
}
} }
} }
@ -84,6 +90,21 @@ fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: Global
} }
} }
/// Describes how we read the chunks from the input.
enum ReadResult {
/// The input was empty. Nothing was read.
EmptyInput,
/// The input fits into a single Chunk, which was kept in memory.
SortedSingleChunk(Chunk),
/// The input fits into two chunks, which were kept in memory.
SortedTwoChunks([Chunk; 2]),
/// The input was read into multiple chunks, which were written to auxiliary files.
WroteChunksToFile {
/// The number of chunks written to auxiliary files.
chunks_written: usize,
},
}
/// The function that is executed on the reader/writer thread. /// The function that is executed on the reader/writer thread.
/// ///
/// # Returns /// # Returns
@ -96,7 +117,7 @@ fn reader_writer(
settings: GlobalSettings, settings: GlobalSettings,
receiver: Receiver<Chunk>, receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>, sender: SyncSender<Chunk>,
) -> usize { ) -> ReadResult {
let mut sender_option = Some(sender); let mut sender_option = Some(sender);
let mut file = files.next().unwrap(); let mut file = files.next().unwrap();
@ -106,21 +127,40 @@ fn reader_writer(
for _ in 0..2 { for _ in 0..2 {
chunks::read( chunks::read(
&mut sender_option, &mut sender_option,
vec![0; buffer_size], vec![0; MIN_BUFFER_SIZE],
Some(buffer_size),
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut files, &mut files,
separator, separator,
Vec::new(), Vec::new(),
&settings, &settings,
) );
if sender_option.is_none() {
// We have already read the whole input. Since we are in our first two reads,
// this means that we can fit the whole input into memory. Bypass writing below and
// handle this case in a more straightforward way.
return if let Ok(first_chunk) = receiver.recv() {
if let Ok(second_chunk) = receiver.recv() {
ReadResult::SortedTwoChunks([first_chunk, second_chunk])
} else {
ReadResult::SortedSingleChunk(first_chunk)
}
} else {
ReadResult::EmptyInput
};
}
} }
let mut file_number = 0; let mut file_number = 0;
loop { loop {
let mut chunk = match receiver.recv() { let mut chunk = match receiver.recv() {
Ok(it) => it, Ok(it) => it,
_ => return file_number, _ => {
return ReadResult::WroteChunksToFile {
chunks_written: file_number,
}
}
}; };
write( write(
@ -129,13 +169,14 @@ fn reader_writer(
separator, separator,
); );
let (recycled_lines, recycled_buffer) = chunk.recycle();
file_number += 1; file_number += 1;
let (recycled_lines, recycled_buffer) = chunk.recycle();
chunks::read( chunks::read(
&mut sender_option, &mut sender_option,
recycled_buffer, recycled_buffer,
None,
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut files, &mut files,

View file

@ -108,6 +108,7 @@ fn reader(
chunks::read( chunks::read(
sender, sender,
recycled_buffer, recycled_buffer,
None,
carry_over, carry_over,
file, file,
&mut iter::empty(), &mut iter::empty(),

View file

@ -93,7 +93,10 @@ static THOUSANDS_SEP: char = ',';
static NEGATIVE: char = '-'; static NEGATIVE: char = '-';
static POSITIVE: char = '+'; static POSITIVE: char = '+';
static DEFAULT_BUF_SIZE: usize = std::usize::MAX; /// Choosing a higher buffer size does not result in performance improvements
/// (at least not on my machine). TODO: In the future, we should also take the amount of
/// available memory into consideration, instead of relying on this constant only.
static DEFAULT_BUF_SIZE: usize = 1_000_000_000;
#[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)] #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)]
enum SortMode { enum SortMode {
@ -127,7 +130,6 @@ pub struct GlobalSettings {
zero_terminated: bool, zero_terminated: bool,
buffer_size: usize, buffer_size: usize,
tmp_dir: PathBuf, tmp_dir: PathBuf,
ext_sort: bool,
} }
impl GlobalSettings { impl GlobalSettings {
@ -189,7 +191,6 @@ impl Default for GlobalSettings {
zero_terminated: false, zero_terminated: false,
buffer_size: DEFAULT_BUF_SIZE, buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(), tmp_dir: PathBuf::new(),
ext_sort: false,
} }
} }
} }
@ -941,28 +942,15 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
env::set_var("RAYON_NUM_THREADS", &settings.threads); env::set_var("RAYON_NUM_THREADS", &settings.threads);
} }
if matches.is_present(OPT_BUF_SIZE) { settings.buffer_size = matches
settings.buffer_size = {
let input = matches
.value_of(OPT_BUF_SIZE) .value_of(OPT_BUF_SIZE)
.map(String::from) .map(GlobalSettings::human_numeric_convert)
.unwrap_or(format!("{}", DEFAULT_BUF_SIZE)); .unwrap_or(DEFAULT_BUF_SIZE);
GlobalSettings::human_numeric_convert(&input) settings.tmp_dir = matches
};
settings.ext_sort = true;
}
if matches.is_present(OPT_TMP_DIR) {
let result = matches
.value_of(OPT_TMP_DIR) .value_of(OPT_TMP_DIR)
.map(String::from) .map(PathBuf::from)
.unwrap_or(format!("{}", env::temp_dir().display())); .unwrap_or_else(env::temp_dir);
settings.tmp_dir = PathBuf::from(result);
settings.ext_sort = true;
} else {
settings.tmp_dir = env::temp_dir();
}
settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED);
settings.merge = matches.is_present(OPT_MERGE); settings.merge = matches.is_present(OPT_MERGE);
@ -1047,7 +1035,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
exec(&files, &settings) exec(&files, &settings)
} }
fn output_sorted_lines<'a>(iter: impl Iterator<Item = Line<'a>>, settings: &GlobalSettings) { fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &GlobalSettings) {
if settings.unique { if settings.unique {
print_sorted( print_sorted(
iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal), iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
@ -1067,34 +1055,10 @@ fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
crash!(1, "only one file allowed with -c"); crash!(1, "only one file allowed with -c");
} }
return check::check(files.first().unwrap(), settings); return check::check(files.first().unwrap(), settings);
} else if settings.ext_sort { } else {
let mut lines = files.iter().filter_map(open); let mut lines = files.iter().filter_map(open);
let mut sorted = ext_sort(&mut lines, &settings); ext_sort(&mut lines, &settings);
sorted.file_merger.write_all(settings);
} else {
let separator = if settings.zero_terminated { '\0' } else { '\n' };
let mut lines = vec![];
let mut full_string = String::new();
for mut file in files.iter().filter_map(open) {
crash_if_err!(1, file.read_to_string(&mut full_string));
if !full_string.ends_with(separator) {
full_string.push(separator);
}
}
if full_string.ends_with(separator) {
full_string.pop();
}
for line in full_string.split(if settings.zero_terminated { '\0' } else { '\n' }) {
lines.push(Line::create(line, &settings));
}
sort_by(&mut lines, &settings);
output_sorted_lines(lines.into_iter(), &settings);
} }
0 0
} }
@ -1366,7 +1330,7 @@ fn version_compare(a: &str, b: &str) -> Ordering {
} }
} }
fn print_sorted<'a, T: Iterator<Item = Line<'a>>>(iter: T, settings: &GlobalSettings) { fn print_sorted<'a, T: Iterator<Item = &'a Line<'a>>>(iter: T, settings: &GlobalSettings) {
let mut writer = settings.out_writer(); let mut writer = settings.out_writer();
for line in iter { for line in iter {
line.print(&mut writer, settings); line.print(&mut writer, settings);

View file

@ -15,30 +15,19 @@ fn test_helper(file_name: &str, args: &str) {
.stdout_is_fixture(format!("{}.expected.debug", file_name)); .stdout_is_fixture(format!("{}.expected.debug", file_name));
} }
// FYI, the initialization size of our Line struct is 96 bytes.
//
// At very small buffer sizes, with that overhead we are certainly going
// to overrun our buffer way, way, way too quickly because of these excess
// bytes for the struct.
//
// For instance, seq 0..20000 > ...text = 108894 bytes
// But overhead is 1920000 + 108894 = 2028894 bytes
//
// Or kjvbible-random.txt = 4332506 bytes, but minimum size of its
// 99817 lines in memory * 96 bytes = 9582432 bytes
//
// Here, we test 108894 bytes with a 50K buffer
//
#[test] #[test]
fn test_larger_than_specified_segment() { fn test_buffer_sizes() {
let buffer_sizes = ["0", "50K", "1M", "1000G"];
for buffer_size in &buffer_sizes {
new_ucmd!() new_ucmd!()
.arg("-n") .arg("-n")
.arg("-S") .arg("-S")
.arg("50K") .arg(buffer_size)
.arg("ext_sort.txt") .arg("ext_sort.txt")
.succeeds() .succeeds()
.stdout_is_fixture("ext_sort.expected"); .stdout_is_fixture("ext_sort.expected");
} }
}
#[test] #[test]
fn test_smaller_than_specified_segment() { fn test_smaller_than_specified_segment() {