diff --git a/src/uu/sort/src/external_sort/LICENSE b/src/uu/sort/src/external_sort/LICENSE new file mode 100644 index 000000000..e26c89c9f --- /dev/null +++ b/src/uu/sort/src/external_sort/LICENSE @@ -0,0 +1,19 @@ +Copyright 2018 Battelle Memorial Institute + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/src/uu/sort/src/external_sort/mod.rs b/src/uu/sort/src/external_sort/mod.rs new file mode 100644 index 000000000..9fcaadcc3 --- /dev/null +++ b/src/uu/sort/src/external_sort/mod.rs @@ -0,0 +1,246 @@ +use std::{clone::Clone}; +use std::cmp::Ordering::Less; +use std::collections::VecDeque; +use std::error::Error; +use std::fs::{File, OpenOptions}; +use std::io::SeekFrom::Start; +use std::io::{BufRead, BufReader, Seek, Write}; +use std::marker::PhantomData; +use std::path::PathBuf; + +use serde::de::DeserializeOwned; +use serde::Serialize; +use serde_json; +use tempdir::TempDir; + +use super::{GlobalSettings, Line}; + +/// Trait for types that can be used by +/// [ExternalSorter](struct.ExternalSorter.html). Must be sortable, cloneable, +/// serializeable, and able to report on it's size +pub trait ExternallySortable: Clone + Serialize + DeserializeOwned { + /// Get the size, in bytes, of this object (used to constrain the buffer + /// used in the external sort). + fn get_size(&self) -> u64; +} + +/// Iterator that provides sorted `T`s +pub struct ExtSortedIterator { + buffers: Vec>, + chunk_offsets: Vec, + max_per_chunk: u64, + chunks: u64, + tmp_dir: TempDir, + settings: GlobalSettings, + failed: bool, +} + +impl Iterator for ExtSortedIterator +where + Line: ExternallySortable, +{ + type Item = Result>; + + /// # Errors + /// + /// This method can fail due to issues reading intermediate sorted chunks + /// from disk, or due to serde deserialization issues + fn next(&mut self) -> Option { + if self.failed { + return None; + } + // fill up any empty buffers + let mut empty = true; + for chunk_num in 0..self.chunks { + if self.buffers[chunk_num as usize].is_empty() { + let mut f = match File::open(self.tmp_dir.path().join(chunk_num.to_string())) { + Ok(f) => f, + Err(e) => { + self.failed = true; + return Some(Err(Box::new(e))); + } + }; + match f.seek(Start(self.chunk_offsets[chunk_num as usize])) { + Ok(_) => (), + Err(e) => { + self.failed = true; + return Some(Err(Box::new(e))); + } + } + let bytes_read = + match fill_buff(&mut self.buffers[chunk_num as usize], f, self.max_per_chunk) { + Ok(bytes_read) => bytes_read, + Err(e) => { + self.failed = true; + return Some(Err(e)); + } + }; + self.chunk_offsets[chunk_num as usize] += bytes_read; + if !self.buffers[chunk_num as usize].is_empty() { + empty = false; + } + } else { + empty = false; + } + } + if empty { + return None; + } + + // find the next record to write + // check is_empty() before unwrap()ing + let mut idx = 0; + for chunk_num in 0..self.chunks as usize { + if !self.buffers[chunk_num].is_empty() { + if self.buffers[idx].is_empty() || (super::compare_by)( + self.buffers[chunk_num].front().unwrap(), + self.buffers[idx].front().unwrap(), + &self.settings + ) == Less + { + idx = chunk_num; + } + } + } + + // unwrap due to checks above + let r = self.buffers[idx].pop_front().unwrap(); + Some(Ok(r)) + } +} + +/// Perform an external sort on an unsorted stream of incoming data +pub struct ExternalSorter +where + Line: ExternallySortable, +{ + tmp_dir: Option, + buffer_bytes: u64, + phantom: PhantomData, + settings: GlobalSettings, +} + +impl ExternalSorter +where + Line: ExternallySortable, +{ + /// Create a new `ExternalSorter` with a specified memory buffer and + /// temporary directory + pub fn new(buffer_bytes: u64, tmp_dir: Option, settings: GlobalSettings) -> ExternalSorter { + ExternalSorter { + buffer_bytes, + tmp_dir, + phantom: PhantomData, + settings, + } + } + + /// Sort (based on `compare`) the `T`s provided by `unsorted` and return an + /// iterator + /// + /// # Errors + /// + /// This method can fail due to issues writing intermediate sorted chunks + /// to disk, or due to serde serialization issues + pub fn sort_by(&self, unsorted: I, settings: GlobalSettings) -> Result, Box> + where + I: Iterator, + { + let tmp_dir = match self.tmp_dir { + Some(ref p) => TempDir::new_in(p, "uutils_sort")?, + None => TempDir::new("uutils_sort")?, + }; + // creating the thing we need to return first due to the face that we need to + // borrow tmp_dir and move it out + let mut iter = ExtSortedIterator { + buffers: Vec::new(), + chunk_offsets: Vec::new(), + max_per_chunk: 0, + chunks: 0, + tmp_dir, + settings, + failed: false, + }; + + { + let mut total_read = 0; + let mut chunk = Vec::new(); + + // make the initial chunks on disk + for seq in unsorted { + total_read += seq.get_size(); + chunk.push(seq); + + if total_read >= self.buffer_bytes { + super::sort_by(&mut chunk, &self.settings); + self.write_chunk( + &iter.tmp_dir.path().join(iter.chunks.to_string()), + &mut chunk, + )?; + chunk.clear(); + total_read = 0; + iter.chunks += 1; + } + } + // write the last chunk + if chunk.len() > 0 { + super::sort_by(&mut chunk, &self.settings); + self.write_chunk( + &iter.tmp_dir.path().join(iter.chunks.to_string()), + &mut chunk, + )?; + iter.chunks += 1; + } + + // initialize buffers for each chunk + iter.max_per_chunk = self.buffer_bytes.checked_div(iter.chunks).unwrap_or(self.buffer_bytes); + iter.buffers = vec![VecDeque::new(); iter.chunks as usize]; + iter.chunk_offsets = vec![0 as u64; iter.chunks as usize]; + for chunk_num in 0..iter.chunks { + let offset = fill_buff( + &mut iter.buffers[chunk_num as usize], + File::open(iter.tmp_dir.path().join(chunk_num.to_string()))?, + iter.max_per_chunk, + )?; + iter.chunk_offsets[chunk_num as usize] = offset; + } + } + + Ok(iter) + } + + fn write_chunk(&self, file: &PathBuf, chunk: &mut Vec) -> Result<(), Box> { + let mut new_file = OpenOptions::new().create(true).append(true).open(file)?; + for s in chunk { + let mut serialized = serde_json::to_string(&s).expect("JSON write error: "); + serialized.push_str("\n"); + new_file.write_all(serialized.as_bytes())?; + } + + Ok(()) + } +} + +fn fill_buff(vec: &mut VecDeque, file: File, max_bytes: u64) -> Result> +where + Line: ExternallySortable, +{ + let mut total_read = 0; + let mut bytes_read = 0; + for line in BufReader::new(file).lines() { + let line_s = line?; + bytes_read += line_s.len() + 1; + // This is where the bad stuff happens usually + let deserialized: Line = match serde_json::from_str(&line_s) { + Ok(x) => x, + Err(err) => panic!("JSON read error: {}", err), + }; + total_read += deserialized.get_size(); + vec.push_back(deserialized); + if total_read > max_bytes { + break; + } + } + + Ok(bytes_read as u64) +} diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 8c3a0cf7f..e77271557 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -91,7 +91,7 @@ static NEGATIVE: char = '-'; static POSITIVE: char = '+'; static DEFAULT_TMPDIR: &str = r"/tmp"; -// 16GB buffer for Vec before we dump to disk +// 16GB buffer for Vec before we dump to disk, never used static DEFAULT_BUF_SIZE: usize = 16000000000; #[derive(Eq, Ord, PartialEq, PartialOrd, Clone)] @@ -292,14 +292,6 @@ impl ExternallySortable for Line { } } -impl PartialEq for Line { - fn eq(&self, other: &Self) -> bool { - self.line == other.line - } -} - -impl Eq for Line {} - impl Line { fn new(line: String, settings: &GlobalSettings) -> Self { let fields = if settings @@ -343,7 +335,7 @@ impl Line { ); range.shorten(num_range); NumCache::WithInfo(info) - } else if selector.settings.mode == SortMode::GeneralNumeric { + } else if selector.settings.mode == SortMode::GeneralNumeric && settings.buffer_size == DEFAULT_BUF_SIZE { NumCache::AsF64(permissive_f64_parse(get_leading_gen(range.get_str(&line)))) } else { NumCache::None @@ -1103,11 +1095,12 @@ fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 { fn ext_sort_by(unsorted: Vec, settings: GlobalSettings) -> Vec { let external_sorter = ExternalSorter::new(settings.buffer_size as u64, Some(settings.tmp_dir.clone()), settings.clone()); - let iter = external_sorter.sort_by(unsorted.into_iter(), settings.clone()).unwrap(); - let vec = iter.filter(|x| x.is_ok() ) - .map(|x| x.unwrap()) - .collect::>(); - vec + let iter = external_sorter + .sort_by(unsorted.into_iter(), settings.clone()) + .unwrap() + .map(|x| x.unwrap()) + .collect::>(); + iter } fn sort_by(lines: &mut Vec, settings: &GlobalSettings) { @@ -1130,10 +1123,15 @@ fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering (a_str, a_selection.num_cache.as_num_info()), (b_str, b_selection.num_cache.as_num_info()), ), - SortMode::GeneralNumeric => general_numeric_compare( - a_selection.num_cache.as_f64(), - b_selection.num_cache.as_f64(), - ), + // serde JSON has issues with f64 null values, so caching them won't work for us with ext sort + SortMode::GeneralNumeric => + if global_settings.buffer_size == DEFAULT_BUF_SIZE { + general_numeric_compare(a_selection.num_cache.as_f64(), + b_selection.num_cache.as_f64()) + } else { + general_numeric_compare(permissive_f64_parse(get_leading_gen(a_str)), + permissive_f64_parse(get_leading_gen(b_str))) + }, SortMode::Month => month_compare(a_str, b_str), SortMode::Version => version_compare(a_str, b_str), SortMode::Default => default_compare(a_str, b_str), diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index c76ab219a..63883cd63 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -51,6 +51,18 @@ fn test_human_numeric_whitespace() { test_helper("human-numeric-whitespace", "-h"); } +// This doesn't test the ext sort feature as such, just this codepath where +// ext sort can fail when reading back JSON if it finds a null value +#[test] +fn test_extsort_as64_bailout() { + new_ucmd!() + .arg("-g") + .arg("-S 10K") + .arg("multiple_decimals_general.txt") + .succeeds() + .stdout_is("\n\n\n\n\n\n\n\nCARAvan\n-2028789030\n-896689\n-8.90880\n-1\n-.05\n000\n00000001\n1\n1.040000000\n1.444\n1.58590\n8.013\n45\n46.89\n576,446.88800000\n576,446.890\n 4567.\n4567.1\n4567.34\n\t\t\t\t\t\t\t\t\t\t4567..457\n\t\t\t\t37800\n\t\t\t\t\t\t45670.89079.098\n\t\t\t\t\t\t45670.89079.1\n4798908.340000000000\n4798908.45\n4798908.8909800\n"); +} + #[test] fn test_multiple_decimals_general() { new_ucmd!()