1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 11:37:44 +00:00

sort: use FileMerger for extsort merge step

FileMerger is much more efficient than the previous algorithm,
which looped over all elements every time to determine the next element.

FileMerger uses a BinaryHeap, which should bring the complexity for
the merge step down from O(n²) to O(n log n).
This commit is contained in:
Michael Debertol 2021-05-07 23:39:00 +02:00 committed by Sylvestre Ledru
parent 64c1f16421
commit 38effc93b3

View file

@ -1,91 +1,33 @@
use std::cmp::Ordering; use std::fs::OpenOptions;
use std::collections::VecDeque; use std::io::{BufWriter, Write};
use std::fs::{File, OpenOptions};
use std::io::SeekFrom;
use std::io::{BufRead, BufReader, BufWriter, Seek, Write};
use std::path::Path; use std::path::Path;
use tempdir::TempDir; use tempdir::TempDir;
use crate::{file_to_lines_iter, FileMerger};
use super::{GlobalSettings, Line}; use super::{GlobalSettings, Line};
/// Iterator that provides sorted `T`s /// Iterator that provides sorted `T`s
pub struct ExtSortedIterator { pub struct ExtSortedIterator<'a> {
buffers: Vec<VecDeque<Line>>, file_merger: FileMerger<'a>,
chunk_offsets: Vec<u64>, // Keep tmp_dir around, it is deleted when dropped.
max_per_chunk: usize, _tmp_dir: TempDir,
chunks: usize,
tmp_dir: TempDir,
settings: GlobalSettings,
failed: bool,
} }
impl Iterator for ExtSortedIterator { impl<'a> Iterator for ExtSortedIterator<'a> {
type Item = Line; type Item = Line;
/// # Errors
///
/// This method can fail due to issues reading intermediate sorted chunks
/// from disk
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.failed { self.file_merger.next()
return None;
}
// fill up any empty buffers
let mut empty = true;
for chunk_num in 0..self.chunks {
if self.buffers[chunk_num as usize].is_empty() {
let mut f = crash_if_err!(
1,
File::open(self.tmp_dir.path().join(chunk_num.to_string()))
);
crash_if_err!(1, f.seek(SeekFrom::Start(self.chunk_offsets[chunk_num])));
let bytes_read = fill_buff(
&mut self.buffers[chunk_num as usize],
f,
self.max_per_chunk,
&self.settings,
);
self.chunk_offsets[chunk_num as usize] += bytes_read as u64;
if !self.buffers[chunk_num as usize].is_empty() {
empty = false;
}
} else {
empty = false;
}
}
if empty {
return None;
}
// find the next record to write
// check is_empty() before unwrap()ing
let mut idx = 0;
for chunk_num in 0..self.chunks as usize {
if !self.buffers[chunk_num].is_empty()
&& (self.buffers[idx].is_empty()
|| super::compare_by(
self.buffers[chunk_num].front().unwrap(),
self.buffers[idx].front().unwrap(),
&self.settings,
) == Ordering::Less)
{
idx = chunk_num;
}
}
// unwrap due to checks above
let r = self.buffers[idx].pop_front().unwrap();
Some(r)
} }
} }
/// Sort (based on `compare`) the `T`s provided by `unsorted` and return an /// Sort (based on `compare`) the `T`s provided by `unsorted` and return an
/// iterator /// iterator
/// ///
/// # Errors /// # Panics
/// ///
/// This method can fail due to issues writing intermediate sorted chunks /// This method can panic due to issues writing intermediate sorted chunks
/// to disk. /// to disk.
pub fn ext_sort( pub fn ext_sort(
unsorted: impl Iterator<Item = Line>, unsorted: impl Iterator<Item = Line>,
@ -93,19 +35,12 @@ pub fn ext_sort(
) -> ExtSortedIterator { ) -> ExtSortedIterator {
let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort"));
let mut iter = ExtSortedIterator {
buffers: Vec::new(),
chunk_offsets: Vec::new(),
max_per_chunk: 0,
chunks: 0,
tmp_dir,
settings: settings.clone(),
failed: false,
};
let mut total_read = 0; let mut total_read = 0;
let mut chunk = Vec::new(); let mut chunk = Vec::new();
let mut chunks_read = 0;
let mut file_merger = FileMerger::new(settings);
// make the initial chunks on disk // make the initial chunks on disk
for seq in unsorted { for seq in unsorted {
let seq_size = seq.estimate_size(); let seq_size = seq.estimate_size();
@ -113,62 +48,35 @@ pub fn ext_sort(
chunk.push(seq); chunk.push(seq);
if total_read >= settings.buffer_size { if total_read >= settings.buffer_size && chunk.len() >= 2 {
super::sort_by(&mut chunk, &settings); super::sort_by(&mut chunk, &settings);
write_chunk(
settings, let file_path = tmp_dir.path().join(chunks_read.to_string());
&iter.tmp_dir.path().join(iter.chunks.to_string()), write_chunk(settings, &file_path, &mut chunk);
&mut chunk,
);
chunk.clear(); chunk.clear();
total_read = 0; total_read = 0;
iter.chunks += 1; chunks_read += 1;
file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap()))
} }
} }
// write the last chunk // write the last chunk
if !chunk.is_empty() { if !chunk.is_empty() {
super::sort_by(&mut chunk, &settings); super::sort_by(&mut chunk, &settings);
let file_path = tmp_dir.path().join(chunks_read.to_string());
write_chunk( write_chunk(
settings, settings,
&iter.tmp_dir.path().join(iter.chunks.to_string()), &tmp_dir.path().join(chunks_read.to_string()),
&mut chunk, &mut chunk,
); );
iter.chunks += 1;
file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap()));
} }
ExtSortedIterator {
// initialize buffers for each chunk file_merger,
// _tmp_dir: tmp_dir,
// Having a right sized buffer for each chunk for smallish values seems silly to me?
//
// We will have to have the entire iter in memory sometime right?
// Set minimum to the size of the writer buffer, ~8K
const MINIMUM_READBACK_BUFFER: usize = 8200;
let right_sized_buffer = settings
.buffer_size
.checked_div(iter.chunks)
.unwrap_or(settings.buffer_size);
iter.max_per_chunk = if right_sized_buffer > MINIMUM_READBACK_BUFFER {
right_sized_buffer
} else {
MINIMUM_READBACK_BUFFER
};
iter.buffers = vec![VecDeque::new(); iter.chunks];
iter.chunk_offsets = vec![0; iter.chunks];
for chunk_num in 0..iter.chunks {
let offset = fill_buff(
&mut iter.buffers[chunk_num],
crash_if_err!(
1,
File::open(iter.tmp_dir.path().join(chunk_num.to_string()))
),
iter.max_per_chunk,
&settings,
);
iter.chunk_offsets[chunk_num] = offset as u64;
} }
iter
} }
fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec<Line>) { fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec<Line>) {
@ -183,29 +91,3 @@ fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec<Line>) {
} }
crash_if_err!(1, buf_write.flush()); crash_if_err!(1, buf_write.flush());
} }
fn fill_buff(
vec: &mut VecDeque<Line>,
file: File,
max_bytes: usize,
settings: &GlobalSettings,
) -> usize {
let mut total_read = 0;
let mut bytes_read = 0;
for line in BufReader::new(file).split(if settings.zero_terminated {
b'\0'
} else {
b'\n'
}) {
let line_s = String::from_utf8(crash_if_err!(1, line)).unwrap();
bytes_read += line_s.len() + 1;
let deserialized = Line::new(line_s, settings);
total_read += deserialized.estimate_size();
vec.push_back(deserialized);
if total_read > max_bytes {
break;
}
}
bytes_read
}