From d7b7ce52bc28904f8af8ae36a9e43e698cbdd295 Mon Sep 17 00:00:00 2001 From: electricboogie <32370782+electricboogie@users.noreply.github.com> Date: Sun, 18 Apr 2021 11:54:18 -0500 Subject: [PATCH] Vendored ext_sorter, removed unstable, created a byte buffer sized vector instead of a numbered capacity vector --- Cargo.lock | 1 + src/uu/sort/Cargo.toml | 1 + src/uu/sort/src/ext_sorter.rs | 347 +++++++++++++++++++++++++++++ src/uu/sort/src/numeric_str_cmp.rs | 2 +- src/uu/sort/src/sort.rs | 112 +++++----- 5 files changed, 409 insertions(+), 54 deletions(-) create mode 100644 src/uu/sort/src/ext_sorter.rs diff --git a/Cargo.lock b/Cargo.lock index b7328009c..76f43d8b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2424,6 +2424,7 @@ dependencies = [ "serde", "serde_json", "smallvec 1.6.1", + "tempfile", "uucore", "uucore_procs", ] diff --git a/src/uu/sort/Cargo.toml b/src/uu/sort/Cargo.toml index 8a3d1ed25..e1e0d1b87 100644 --- a/src/uu/sort/Cargo.toml +++ b/src/uu/sort/Cargo.toml @@ -28,6 +28,7 @@ semver = "0.9.0" smallvec = { version = "1.6.1", features = ["serde"] } uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] } uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" } +tempfile = "3.1.0" [[bin]] name = "sort" diff --git a/src/uu/sort/src/ext_sorter.rs b/src/uu/sort/src/ext_sorter.rs new file mode 100644 index 000000000..c19f1262b --- /dev/null +++ b/src/uu/sort/src/ext_sorter.rs @@ -0,0 +1,347 @@ +// Copyright 2018 Andre-Philippe Paquet +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use rayon::prelude::*; +use std::{ + cmp::Ordering, + collections::VecDeque, + fs::{File, OpenOptions}, + io::{BufReader, BufWriter, Error, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, +}; + +/// Exposes external sorting (i.e. on disk sorting) capability on arbitrarily +/// sized iterator, even if the generated content of the iterator doesn't fit in +/// memory. +/// +/// It uses an in-memory buffer sorted and flushed to disk in segment files when +/// full. Once sorted, it returns a new sorted iterator with all items. In order +/// to remain efficient for all implementations, the crate doesn't handle +/// serialization, but leaves that to the user. +pub struct ExternalSorter { + segment_size: usize, + sort_dir: Option, + parallel: bool, +} + +impl ExternalSorter { + pub fn new() -> ExternalSorter { + ExternalSorter { + segment_size: 10000000, + sort_dir: None, + parallel: false, + } + } + + /// Sets the maximum size of each segment in number of sorted items. + /// + /// This number of items needs to fit in memory. While sorting, a + /// in-memory buffer is used to collect the items to be sorted. Once + /// it reaches the maximum size, it is sorted and then written to disk. + /// + /// Using a higher segment size makes sorting faster by leveraging + /// faster in-memory operations. + pub fn with_segment_size(mut self, size: usize) -> Self { + self.segment_size = size; + self + } + + /// Sets directory in which sorted segments will be written (if it doesn't + /// fit in memory). + pub fn with_sort_dir(mut self, path: PathBuf) -> Self { + self.sort_dir = Some(path); + self + } + + /// Uses Rayon to sort the in-memory buffer. + /// + /// This may not be needed if the buffer isn't big enough for parallelism to + /// be gainful over the overhead of multithreading. + pub fn with_parallel_sort(mut self) -> Self { + self.parallel = true; + self + } + + /// Sorts a given iterator, returning a new iterator with items + pub fn sort( + &self, + iterator: I, + ) -> Result Ordering + Send + Sync>, Error> + where + T: Sortable + Ord, + I: Iterator, + { + self.sort_by(iterator, |a, b| a.cmp(b)) + } + + /// Sorts a given iterator with a comparator function, returning a new iterator with items + pub fn sort_by(&self, iterator: I, cmp: F) -> Result, Error> + where + T: Sortable, + I: Iterator, + F: Fn(&T, &T) -> Ordering + Send + Sync, + { + let mut tempdir: Option = None; + let mut sort_dir: Option = None; + + let mut count = 0; + let mut segments_file: Vec = Vec::new(); + let size_of_items = std::mem::size_of::(); + let mut buffer: Vec = Vec::with_capacity(self.segment_size / size_of_items); + for next_item in iterator { + count += 1; + buffer.push(next_item); + if buffer.len() > self.segment_size { + let sort_dir = self.lazy_create_dir(&mut tempdir, &mut sort_dir)?; + self.sort_and_write_segment(sort_dir, &mut segments_file, &mut buffer, &cmp)?; + } + } + + // Write any items left in buffer, but only if we had at least 1 segment + // written. Otherwise we use the buffer itself to iterate from memory + let pass_through_queue = if !buffer.is_empty() && !segments_file.is_empty() { + let sort_dir = self.lazy_create_dir(&mut tempdir, &mut sort_dir)?; + self.sort_and_write_segment(sort_dir, &mut segments_file, &mut buffer, &cmp)?; + None + } else { + buffer.sort_by(&cmp); + Some(VecDeque::from(buffer)) + }; + + SortedIterator::new(tempdir, pass_through_queue, segments_file, count, cmp) + } + + /// Sorts a given iterator with a key extraction function, returning a new iterator with items + pub fn sort_by_key( + &self, + iterator: I, + f: F, + ) -> Result Ordering + Send + Sync>, Error> + where + T: Sortable, + I: Iterator, + F: Fn(&T) -> K + Send + Sync, + K: Ord, + { + self.sort_by(iterator, move |a, b| f(a).cmp(&f(b))) + } + + /// We only want to create directory if it's needed (i.e. if the dataset + /// doesn't fit in memory) to prevent filesystem latency + fn lazy_create_dir<'a>( + &self, + tempdir: &mut Option, + sort_dir: &'a mut Option, + ) -> Result<&'a Path, Error> { + if let Some(sort_dir) = sort_dir { + return Ok(sort_dir); + } + + *sort_dir = if let Some(ref sort_dir) = self.sort_dir { + Some(sort_dir.to_path_buf()) + } else { + *tempdir = Some(tempfile::TempDir::new()?); + Some(tempdir.as_ref().unwrap().path().to_path_buf()) + }; + + Ok(sort_dir.as_ref().unwrap()) + } + + fn sort_and_write_segment( + &self, + sort_dir: &Path, + segments: &mut Vec, + buffer: &mut Vec, + cmp: F, + ) -> Result<(), Error> + where + T: Sortable, + F: Fn(&T, &T) -> Ordering + Send + Sync, + { + if self.parallel { + buffer.par_sort_by(|a, b| cmp(a, b)); + } else { + buffer.sort_by(|a, b| cmp(a, b)); + } + + let segment_path = sort_dir.join(format!("{}", segments.len())); + let segment_file = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(&segment_path)?; + let mut buf_writer = BufWriter::new(segment_file); + + for item in buffer.drain(0..) { + item.encode(&mut buf_writer); + } + + let file = buf_writer.into_inner()?; + segments.push(file); + + Ok(()) + } +} + +impl Default for ExternalSorter { + fn default() -> Self { + ExternalSorter::new() + } +} + +pub trait Sortable: Sized + Send { + fn encode(&self, writer: &mut W); + fn decode(reader: &mut R) -> Option; +} + +pub struct SortedIterator { + _tempdir: Option, + pass_through_queue: Option>, + segments_file: Vec>, + next_values: Vec>, + count: u64, + cmp: F, +} + +impl Ordering + Send + Sync> SortedIterator { + fn new( + tempdir: Option, + pass_through_queue: Option>, + mut segments_file: Vec, + count: u64, + cmp: F, + ) -> Result, Error> { + for segment in &mut segments_file { + segment.seek(SeekFrom::Start(0))?; + } + + let next_values = segments_file + .iter_mut() + .map(|file| T::decode(file)) + .collect(); + + let segments_file_buffered = segments_file.into_iter().map(BufReader::new).collect(); + + Ok(SortedIterator { + _tempdir: tempdir, + pass_through_queue, + segments_file: segments_file_buffered, + next_values, + count, + cmp, + }) + } + + pub fn sorted_count(&self) -> u64 { + self.count + } +} + +impl Ordering> Iterator for SortedIterator { + type Item = T; + + fn next(&mut self) -> Option { + // if we have a pass through, we dequeue from it directly + if let Some(ptb) = self.pass_through_queue.as_mut() { + return ptb.pop_front(); + } + + // otherwise, we iter from segments on disk + let mut smallest_idx: Option = None; + { + let mut smallest: Option<&T> = None; + for idx in 0..self.segments_file.len() { + let next_value = self.next_values[idx].as_ref(); + if next_value.is_none() { + continue; + } + + if smallest.is_none() + || (self.cmp)(next_value.unwrap(), smallest.unwrap()) == Ordering::Less + { + smallest = Some(next_value.unwrap()); + smallest_idx = Some(idx); + } + } + } + + smallest_idx.map(|idx| { + let file = &mut self.segments_file[idx]; + let value = self.next_values[idx].take().unwrap(); + self.next_values[idx] = T::decode(file); + value + }) + } +} + +#[cfg(test)] +pub mod test { + use super::*; + + use byteorder::{ReadBytesExt, WriteBytesExt}; + + #[test] + fn test_smaller_than_segment() { + let sorter = ExternalSorter::new(); + let data: Vec = (0..100u32).collect(); + let data_rev: Vec = data.iter().rev().cloned().collect(); + + let sorted_iter = sorter.sort(data_rev.into_iter()).unwrap(); + + // should not have used any segments (all in memory) + assert_eq!(sorted_iter.segments_file.len(), 0); + let sorted_data: Vec = sorted_iter.collect(); + + assert_eq!(data, sorted_data); + } + + #[test] + fn test_multiple_segments() { + let sorter = ExternalSorter::new().with_segment_size(100); + let data: Vec = (0..1000u32).collect(); + + let data_rev: Vec = data.iter().rev().cloned().collect(); + let sorted_iter = sorter.sort(data_rev.into_iter()).unwrap(); + assert_eq!(sorted_iter.segments_file.len(), 10); + + let sorted_data: Vec = sorted_iter.collect(); + assert_eq!(data, sorted_data); + } + + #[test] + fn test_parallel() { + let sorter = ExternalSorter::new() + .with_segment_size(100) + .with_parallel_sort(); + let data: Vec = (0..1000u32).collect(); + + let data_rev: Vec = data.iter().rev().cloned().collect(); + let sorted_iter = sorter.sort(data_rev.into_iter()).unwrap(); + assert_eq!(sorted_iter.segments_file.len(), 10); + + let sorted_data: Vec = sorted_iter.collect(); + assert_eq!(data, sorted_data); + } + + impl Sortable for u32 { + fn encode(&self, writer: &mut W) { + writer.write_u32::(*self).unwrap(); + } + + fn decode(reader: &mut R) -> Option { + reader.read_u32::().ok() + } + } +} diff --git a/src/uu/sort/src/numeric_str_cmp.rs b/src/uu/sort/src/numeric_str_cmp.rs index ac615d1f7..b15eec988 100644 --- a/src/uu/sort/src/numeric_str_cmp.rs +++ b/src/uu/sort/src/numeric_str_cmp.rs @@ -14,8 +14,8 @@ //! More specifically, exponent can be understood so that the original number is in (1..10)*10^exponent. //! From that follows the constraints of this algorithm: It is able to compare numbers in ±(1*10^[i64::MIN]..10*10^[i64::MAX]). +use serde::{Deserialize, Serialize}; use std::{cmp::Ordering, ops::Range}; -use serde::{Serialize, Deserialize}; #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Clone)] enum Sign { diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 9052b2a5b..07a8879b7 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -16,6 +16,8 @@ extern crate uucore; mod numeric_str_cmp; +pub mod ext_sorter; +pub use ext_sorter::{ExternalSorter, Sortable, SortedIterator}; use clap::{App, Arg}; use fnv::FnvHasher; @@ -24,26 +26,20 @@ use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use semver::Version; +use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use std::borrow::Cow; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::env; +use std::ffi::OsString; use std::fs::File; use std::hash::{Hash, Hasher}; use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Lines, Read, Write}; use std::mem::replace; use std::ops::{Range, RangeInclusive}; -use std::path::Path; +use std::path::{Path, PathBuf}; use uucore::fs::is_stdin_interactive; // for Iterator::dedup() -use extsort::*; -use std::str; -use serde::{Serialize, Deserialize}; -use std::ffi::OsString; -use std::usize; -use std::path::PathBuf; -use std::string::*; -use rayon::prelude::*; static NAME: &str = "sort"; static ABOUT: &str = "Display sorted concatenation of all FILE(s)."; @@ -255,30 +251,39 @@ struct Line { impl Sortable for Line { fn encode(&self, write: &mut W) { - let line = Line {line: self.line.to_owned(), selections: self.selections.to_owned() }; + let line = Line { + line: self.line.to_owned(), + selections: self.selections.to_owned(), + }; let serialized = serde_json::ser::to_string(&line).unwrap(); // Each instance of valid JSON needs to be seperated by something, so here we use a newline - write.write_all(format!("{}{}", serialized, "\n").as_bytes()).unwrap(); + write + .write_all(format!("{}{}", serialized, "\n").as_bytes()) + .unwrap(); } // This crate asks us to write one Line at a time, but returns multiple Lines to us(?). - // However, this crate also expects us to return a result of Option, - // so we concat the these lines into a single Option. So, it's possible this is broken, + // However, this crate also expects us to return a result of Option, + // so we concat the these lines into a single Option. So, it's possible this is broken, // and/or needs to be tested more thoroughly. Perhaps we need to rethink our Line struct or rewrite a - // ext sorter ourselves. + // ext sorter ourselves. fn decode(read: &mut R) -> Option { let buf_reader = BufReader::new(read); let result = { - let mut line_joined= String::new(); - let mut selections_joined= SmallVec::new(); - for line in buf_reader.lines() { + let mut line_joined = String::new(); + let mut selections_joined = SmallVec::new(); + let p_iter = buf_reader.lines().peekable(); + for line in p_iter { let mut deserialized_line: Line = serde_json::de::from_str(&line.unwrap()).unwrap(); - line_joined = format!("{}\n{}", line_joined, deserialized_line.line); - // I think we've done our sorting already and these are irrelevant? @miDeb what's your sense? + line_joined = format!("{}\n{}\n", line_joined, deserialized_line.line); + // I think we've done our sorting already and these are irrelevant? + // @miDeb what's your sense? Could we just return an empty vec? selections_joined.append(&mut deserialized_line.selections); - selections_joined.dedup(); } - Some( Line {line: line_joined, selections: selections_joined} ) + Some(Line { + line: line_joined.strip_suffix("\n").unwrap().to_owned(), + selections: selections_joined, + }) }; result } @@ -798,6 +803,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { ) .arg( Arg::with_name(OPT_BUF_SIZE) + .short("S") .long(OPT_BUF_SIZE) .help("sets the maximum SIZE of each segment in number of sorted items") .takes_value(true) @@ -805,6 +811,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { ) .arg( Arg::with_name(OPT_TMP_DIR) + .short("T") .long(OPT_TMP_DIR) .help("use DIR for temporaries, not $TMPDIR or /tmp") .takes_value(true) @@ -875,13 +882,13 @@ pub fn uumain(args: impl uucore::Args) -> i32 { if matches.is_present(OPT_BUF_SIZE) { // 10K is the default extsort buffer, but that's too small, so we set at 10M - // Although the "default" is never used unless extsort options are given - settings.buffer_size = { + // Although the "default" is never used unless extsort options are given + settings.buffer_size = { let input = matches - .value_of(OPT_BUF_SIZE) - .map(String::from) - .unwrap_or( format! ( "{}", DEFAULT_BUF_SIZE ) ); - + .value_of(OPT_BUF_SIZE) + .map(String::from) + .unwrap_or(format!("{}", DEFAULT_BUF_SIZE)); + human_numeric_convert(&input) } } @@ -890,13 +897,16 @@ pub fn uumain(args: impl uucore::Args) -> i32 { let result = matches .value_of(OPT_TMP_DIR) .map(String::from) - .unwrap_or(DEFAULT_TMPDIR.to_owned() ); + .unwrap_or(DEFAULT_TMPDIR.to_owned()); settings.tmp_dir = PathBuf::from(format!(r"{}", result)); } else { for (key, value) in env::vars_os() { if key == OsString::from("TMPDIR") { - settings.tmp_dir = PathBuf::from(format!(r"{}", value.into_string().unwrap_or("/tmp".to_owned()))); - break + settings.tmp_dir = PathBuf::from(format!( + r"{}", + value.into_string().unwrap_or("/tmp".to_owned()) + )); + break; } settings.tmp_dir = PathBuf::from(DEFAULT_TMPDIR); } @@ -1019,12 +1029,9 @@ fn exec(files: Vec, settings: &GlobalSettings) -> i32 { if settings.check { return exec_check_file(&lines, &settings); } - - if ( settings.buffer_size != DEFAULT_BUF_SIZE ) || ( settings.tmp_dir.as_os_str() != DEFAULT_TMPDIR ) { - lines = ext_sort_by(lines, &settings); - } else { - sort_by(&mut lines, &settings); - } + + + lines = sort_by(lines, &settings); if settings.merge { if settings.unique { @@ -1079,20 +1086,18 @@ fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 { } } -fn ext_sort_by(lines: Vec, settings: &GlobalSettings) -> Vec { - let sorter = ExternalSorter::new().with_segment_size(settings.buffer_size).with_sort_dir(settings.tmp_dir.clone()).with_parallel_sort(); - let result = sorter.sort_by(lines.into_iter(), |a, b| compare_by(a, b, &settings)).unwrap().collect(); +fn sort_by(lines: Vec, settings: &GlobalSettings) -> Vec { + let sorter = ExternalSorter::new() + .with_segment_size(settings.buffer_size) + .with_sort_dir(settings.tmp_dir.clone()) + .with_parallel_sort(); + let result = sorter + .sort_by(lines.into_iter(), |a, b| compare_by(a, b, &settings)) + .unwrap() + .collect(); result } -fn sort_by(lines: &mut Vec, settings: &GlobalSettings) { - if settings.stable || settings.unique { - lines.par_sort_by(|a, b| compare_by(a, b, &settings)) - } else { - lines.par_sort_unstable_by(|a, b| compare_by(a, b, &settings)) - } -} - fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering { for (idx, selector) in global_settings.selectors.iter().enumerate() { let a_selection = &a.selections[idx]; @@ -1137,14 +1142,14 @@ fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering } } -// Brought back! Probably want to do through numstrcmp somehow now +// It's back to do conversions for command options! Probably want to do through numstrcmp somehow now fn human_numeric_convert(a: &str) -> usize { let num_part = leading_num_common(a); let (_, s) = a.split_at(num_part.len()); let num_part = permissive_f64_parse(num_part); let suffix = match s.parse().unwrap_or('\0') { // SI Units - 'K' => 1E3, + 'K' | 'k' => 1E3, 'M' => 1E6, 'G' => 1E9, 'T' => 1E12, @@ -1164,7 +1169,7 @@ fn default_compare(a: &str, b: &str) -> Ordering { a.cmp(b) } -// This function does the initial detection of numeric lines. +/// This function does the initial detection of numeric lines for FP compares. // Lines starting with a number or positive or negative sign. // It also strips the string of any thing that could never // be a number for the purposes of any type of numeric comparison. @@ -1195,7 +1200,7 @@ fn leading_num_common(a: &str) -> &str { s } -// This function cleans up the initial comparison done by leading_num_common for a general numeric compare. +/// This function cleans up the initial comparison done by leading_num_common for a general numeric compare. // In contrast to numeric compare, GNU general numeric/FP sort *should* recognize positive signs and // scientific notation, so we strip those lines only after the end of the following numeric string. // For example, 5e10KFD would be 5e10 or 5x10^10 and +10000HFKJFK would become 10000. @@ -1318,7 +1323,7 @@ fn month_parse(line: &str) -> Month { "" }; - match pattern.to_uppercase().as_ref() { + let result = match pattern.to_uppercase().as_ref() { "JAN" => Month::January, "FEB" => Month::February, "MAR" => Month::March, @@ -1332,7 +1337,8 @@ fn month_parse(line: &str) -> Month { "NOV" => Month::November, "DEC" => Month::December, _ => Month::Unknown, - } + }; + result } fn month_compare(a: &str, b: &str) -> Ordering {