From 38effc93b3d8a34a1136a9911eb9b1e0da7359c7 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Fri, 7 May 2021 23:39:00 +0200 Subject: [PATCH] sort: use FileMerger for extsort merge step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FileMerger is much more efficient than the previous algorithm, which looped over all elements every time to determine the next element. FileMerger uses a BinaryHeap, which should bring the complexity for the merge step down from O(n²) to O(n log n). --- src/uu/sort/src/external_sort/mod.rs | 178 +++++---------------------- 1 file changed, 30 insertions(+), 148 deletions(-) diff --git a/src/uu/sort/src/external_sort/mod.rs b/src/uu/sort/src/external_sort/mod.rs index 725b17bbd..af6902367 100644 --- a/src/uu/sort/src/external_sort/mod.rs +++ b/src/uu/sort/src/external_sort/mod.rs @@ -1,91 +1,33 @@ -use std::cmp::Ordering; -use std::collections::VecDeque; -use std::fs::{File, OpenOptions}; -use std::io::SeekFrom; -use std::io::{BufRead, BufReader, BufWriter, Seek, Write}; +use std::fs::OpenOptions; +use std::io::{BufWriter, Write}; use std::path::Path; use tempdir::TempDir; +use crate::{file_to_lines_iter, FileMerger}; + use super::{GlobalSettings, Line}; /// Iterator that provides sorted `T`s -pub struct ExtSortedIterator { - buffers: Vec>, - chunk_offsets: Vec, - max_per_chunk: usize, - chunks: usize, - tmp_dir: TempDir, - settings: GlobalSettings, - failed: bool, +pub struct ExtSortedIterator<'a> { + file_merger: FileMerger<'a>, + // Keep tmp_dir around, it is deleted when dropped. + _tmp_dir: TempDir, } -impl Iterator for ExtSortedIterator { +impl<'a> Iterator for ExtSortedIterator<'a> { type Item = Line; - - /// # Errors - /// - /// This method can fail due to issues reading intermediate sorted chunks - /// from disk fn next(&mut self) -> Option { - if self.failed { - return None; - } - // fill up any empty buffers - let mut empty = true; - for chunk_num in 0..self.chunks { - if self.buffers[chunk_num as usize].is_empty() { - let mut f = crash_if_err!( - 1, - File::open(self.tmp_dir.path().join(chunk_num.to_string())) - ); - crash_if_err!(1, f.seek(SeekFrom::Start(self.chunk_offsets[chunk_num]))); - let bytes_read = fill_buff( - &mut self.buffers[chunk_num as usize], - f, - self.max_per_chunk, - &self.settings, - ); - self.chunk_offsets[chunk_num as usize] += bytes_read as u64; - if !self.buffers[chunk_num as usize].is_empty() { - empty = false; - } - } else { - empty = false; - } - } - if empty { - return None; - } - - // find the next record to write - // check is_empty() before unwrap()ing - let mut idx = 0; - for chunk_num in 0..self.chunks as usize { - if !self.buffers[chunk_num].is_empty() - && (self.buffers[idx].is_empty() - || super::compare_by( - self.buffers[chunk_num].front().unwrap(), - self.buffers[idx].front().unwrap(), - &self.settings, - ) == Ordering::Less) - { - idx = chunk_num; - } - } - - // unwrap due to checks above - let r = self.buffers[idx].pop_front().unwrap(); - Some(r) + self.file_merger.next() } } /// Sort (based on `compare`) the `T`s provided by `unsorted` and return an /// iterator /// -/// # Errors +/// # Panics /// -/// This method can fail due to issues writing intermediate sorted chunks +/// This method can panic due to issues writing intermediate sorted chunks /// to disk. pub fn ext_sort( unsorted: impl Iterator, @@ -93,19 +35,12 @@ pub fn ext_sort( ) -> ExtSortedIterator { let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); - let mut iter = ExtSortedIterator { - buffers: Vec::new(), - chunk_offsets: Vec::new(), - max_per_chunk: 0, - chunks: 0, - tmp_dir, - settings: settings.clone(), - failed: false, - }; - let mut total_read = 0; let mut chunk = Vec::new(); + let mut chunks_read = 0; + let mut file_merger = FileMerger::new(settings); + // make the initial chunks on disk for seq in unsorted { let seq_size = seq.estimate_size(); @@ -113,62 +48,35 @@ pub fn ext_sort( chunk.push(seq); - if total_read >= settings.buffer_size { + if total_read >= settings.buffer_size && chunk.len() >= 2 { super::sort_by(&mut chunk, &settings); - write_chunk( - settings, - &iter.tmp_dir.path().join(iter.chunks.to_string()), - &mut chunk, - ); + + let file_path = tmp_dir.path().join(chunks_read.to_string()); + write_chunk(settings, &file_path, &mut chunk); chunk.clear(); total_read = 0; - iter.chunks += 1; + chunks_read += 1; + + file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap())) } } // write the last chunk if !chunk.is_empty() { super::sort_by(&mut chunk, &settings); + + let file_path = tmp_dir.path().join(chunks_read.to_string()); write_chunk( settings, - &iter.tmp_dir.path().join(iter.chunks.to_string()), + &tmp_dir.path().join(chunks_read.to_string()), &mut chunk, ); - iter.chunks += 1; + + file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap())); } - - // initialize buffers for each chunk - // - // Having a right sized buffer for each chunk for smallish values seems silly to me? - // - // We will have to have the entire iter in memory sometime right? - // Set minimum to the size of the writer buffer, ~8K - - const MINIMUM_READBACK_BUFFER: usize = 8200; - let right_sized_buffer = settings - .buffer_size - .checked_div(iter.chunks) - .unwrap_or(settings.buffer_size); - iter.max_per_chunk = if right_sized_buffer > MINIMUM_READBACK_BUFFER { - right_sized_buffer - } else { - MINIMUM_READBACK_BUFFER - }; - iter.buffers = vec![VecDeque::new(); iter.chunks]; - iter.chunk_offsets = vec![0; iter.chunks]; - for chunk_num in 0..iter.chunks { - let offset = fill_buff( - &mut iter.buffers[chunk_num], - crash_if_err!( - 1, - File::open(iter.tmp_dir.path().join(chunk_num.to_string())) - ), - iter.max_per_chunk, - &settings, - ); - iter.chunk_offsets[chunk_num] = offset as u64; + ExtSortedIterator { + file_merger, + _tmp_dir: tmp_dir, } - - iter } fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec) { @@ -183,29 +91,3 @@ fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec) { } crash_if_err!(1, buf_write.flush()); } - -fn fill_buff( - vec: &mut VecDeque, - file: File, - max_bytes: usize, - settings: &GlobalSettings, -) -> usize { - let mut total_read = 0; - let mut bytes_read = 0; - for line in BufReader::new(file).split(if settings.zero_terminated { - b'\0' - } else { - b'\n' - }) { - let line_s = String::from_utf8(crash_if_err!(1, line)).unwrap(); - bytes_read += line_s.len() + 1; - let deserialized = Line::new(line_s, settings); - total_read += deserialized.estimate_size(); - vec.push_back(deserialized); - if total_read > max_bytes { - break; - } - } - - bytes_read -}