From fcd48813e01b279604f286e87f7dcbacafe9759b Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sun, 16 May 2021 21:13:37 +0200 Subject: [PATCH] sort: read files as chunks, off-thread Instead of using a BufReader and reading each line separately, allocating a String for each one, we read to a chunk. Lines are references to this chunk. This makes the allocator's job much easier and yields performance improvements. Chunks are read on a separate thread to further improve performance. --- Cargo.lock | 141 +++++++- src/uu/sort/BENCHMARKING.md | 15 +- src/uu/sort/Cargo.toml | 10 +- src/uu/sort/src/check.rs | 102 ++++++ src/uu/sort/src/chunks.rs | 202 +++++++++++ src/uu/sort/src/ext_sort.rs | 160 +++++++++ src/uu/sort/src/external_sort/LICENSE | 19 - src/uu/sort/src/external_sort/mod.rs | 93 ----- src/uu/sort/src/merge.rs | 223 ++++++++++++ src/uu/sort/src/sort.rs | 486 +++++++++----------------- tests/by-util/test_sort.rs | 4 +- 11 files changed, 1003 insertions(+), 452 deletions(-) create mode 100644 src/uu/sort/src/check.rs create mode 100644 src/uu/sort/src/chunks.rs create mode 100644 src/uu/sort/src/ext_sort.rs delete mode 100644 src/uu/sort/src/external_sort/LICENSE delete mode 100644 src/uu/sort/src/external_sort/mod.rs create mode 100644 src/uu/sort/src/merge.rs diff --git a/Cargo.lock b/Cargo.lock index 77957de80..feda68de5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,11 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" + [[package]] name = "advapi32-sys" version = "0.2.0" @@ -63,6 +69,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "binary-heap-plus" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f068638f8ff9e118a9361e66a411eff410e7fb3ecaa23bf9272324f8fc606d7" +dependencies = [ + "compare", +] + [[package]] name = "bit-set" version = "0.5.2" @@ -136,9 +151,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "cast" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc38c385bfd7e444464011bb24820f40dd1c76bcdfa1b78611cb7c2e5cafab75" +checksum = "57cdfa5d50aad6cb4d44dcab6101a7f79925bd59d82ca42f38a9856a28865374" dependencies = [ "rustc_version", ] @@ -198,6 +213,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "compare" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120133d4db2ec47efe2e26502ee984747630c67f51974fca0b6c1340cf2368d3" + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -999,6 +1020,29 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "ouroboros" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f52300b81ac4eeeb6c00c20f7e86556c427d9fb2d92b68fc73c22f331cd15" +dependencies = [ + "ouroboros_macro", + "stable_deref_trait", +] + +[[package]] +name = "ouroboros_macro" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41db02c8f8731cdd7a72b433c7900cce4bf245465b452c364bfd21f4566ab055" +dependencies = [ + "Inflector", + "proc-macro-error", + "proc-macro2", + "quote 1.0.9", + "syn", +] + [[package]] name = "output_vt100" version = "0.1.2" @@ -1027,6 +1071,15 @@ dependencies = [ "proc-macro-hack", ] +[[package]] +name = "pest" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" +dependencies = [ + "ucd-trie", +] + [[package]] name = "pkg-config" version = "0.3.19" @@ -1089,6 +1142,30 @@ dependencies = [ "output_vt100", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote 1.0.9", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote 1.0.9", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -1336,11 +1413,11 @@ checksum = "3e52c148ef37f8c375d49d5a73aa70713125b7f19095948a923f80afdeb22ec2" [[package]] name = "rustc_version" -version = "0.2.3" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" dependencies = [ - "semver", + "semver 0.11.0", ] [[package]] @@ -1370,7 +1447,16 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" dependencies = [ - "semver-parser", + "semver-parser 0.7.0", +] + +[[package]] +name = "semver" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" +dependencies = [ + "semver-parser 0.10.2", ] [[package]] @@ -1380,10 +1466,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] -name = "serde" -version = "1.0.125" +name = "semver-parser" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +dependencies = [ + "pest", +] + +[[package]] +name = "serde" +version = "1.0.126" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" [[package]] name = "serde_cbor" @@ -1397,9 +1492,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.125" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d" +checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" dependencies = [ "proc-macro2", "quote 1.0.9", @@ -1468,6 +1563,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.8.0" @@ -1627,6 +1728,12 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" +[[package]] +name = "ucd-trie" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" + [[package]] name = "unicode-segmentation" version = "1.7.1" @@ -2402,12 +2509,16 @@ dependencies = [ name = "uu_sort" version = "0.0.6" dependencies = [ + "binary-heap-plus", "clap", + "compare", "fnv", "itertools 0.10.0", + "memchr 2.4.0", + "ouroboros", "rand 0.7.3", "rayon", - "semver", + "semver 0.9.0", "tempdir", "unicode-width", "uucore", @@ -2720,6 +2831,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version_check" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" + [[package]] name = "void" version = "1.0.2" diff --git a/src/uu/sort/BENCHMARKING.md b/src/uu/sort/BENCHMARKING.md index 71c331105..52866719d 100644 --- a/src/uu/sort/BENCHMARKING.md +++ b/src/uu/sort/BENCHMARKING.md @@ -75,7 +75,20 @@ Try running commands with the `-S` option set to an amount of memory to be used, huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt` multiple times (this will add the contents of `shuffled_wordlist.txt` to itself). Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'` -` + +## Merging + +"Merge" sort merges already sorted files. It is a sub-step of external sorting, so benchmarking it separately may be helpful. + +- Splitting `shuffled_wordlist.txt` can be achieved by running `split shuffled_wordlist.txt shuffled_wordlist_slice_ --additional-suffix=.txt` +- Sort each part by running `for f in shuffled_wordlist_slice_*; do sort $f -o $f; done` +- Benchmark merging by running `hyperfine "target/release/coreutils sort -m shuffled_wordlist_slice_*"` + +## Check + +When invoked with -c, we simply check if the input is already ordered. The input for benchmarking should be an already sorted file. + +- Benchmark checking by running `hyperfine "target/release/coreutils sort -c sorted_wordlist.txt"` ## Stdout and stdin performance diff --git a/src/uu/sort/Cargo.toml b/src/uu/sort/Cargo.toml index 5221f1f4e..724744dc4 100644 --- a/src/uu/sort/Cargo.toml +++ b/src/uu/sort/Cargo.toml @@ -15,16 +15,20 @@ edition = "2018" path = "src/sort.rs" [dependencies] -rayon = "1.5" -rand = "0.7" +binary-heap-plus = "0.4.1" clap = "2.33" +compare = "0.1.0" fnv = "1.0.7" itertools = "0.10.0" +memchr = "2.4.0" +ouroboros = "0.9.3" +rand = "0.7" +rayon = "1.5" semver = "0.9.0" +tempdir = "0.3.7" unicode-width = "0.1.8" uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] } uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" } -tempdir = "0.3.7" [[bin]] name = "sort" diff --git a/src/uu/sort/src/check.rs b/src/uu/sort/src/check.rs new file mode 100644 index 000000000..fe815b624 --- /dev/null +++ b/src/uu/sort/src/check.rs @@ -0,0 +1,102 @@ +// * This file is part of the uutils coreutils package. +// * +// * (c) Michael Debertol +// * +// * For the full copyright and license information, please view the LICENSE +// * file that was distributed with this source code. + +//! Check if a file is ordered + +use crate::{ + chunks::{self, Chunk}, + compare_by, open, GlobalSettings, +}; +use itertools::Itertools; +use std::{ + cmp::Ordering, + io::Read, + iter, + sync::mpsc::{sync_channel, Receiver, SyncSender}, + thread, +}; + +/// Check if the file at `path` is ordered. +/// +/// # Returns +/// +/// The code we should exit with. +pub fn check(path: &str, settings: &GlobalSettings) -> i32 { + let file = open(path).expect("failed to open input file"); + let (recycled_sender, recycled_receiver) = sync_channel(2); + let (loaded_sender, loaded_receiver) = sync_channel(2); + thread::spawn({ + let settings = settings.clone(); + move || reader(file, recycled_receiver, loaded_sender, &settings) + }); + for _ in 0..2 { + recycled_sender + .send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new())) + .unwrap(); + } + + let mut prev_chunk: Option = None; + let mut line_idx = 0; + for chunk in loaded_receiver.iter() { + line_idx += 1; + if let Some(prev_chunk) = prev_chunk.take() { + // Check if the first element of the new chunk is greater than the last + // element from the previous chunk + let prev_last = prev_chunk.borrow_lines().last().unwrap(); + let new_first = chunk.borrow_lines().first().unwrap(); + + if compare_by(prev_last, new_first, &settings) == Ordering::Greater { + if !settings.check_silent { + println!("sort: {}:{}: disorder: {}", path, line_idx, new_first.line); + } + return 1; + } + recycled_sender.send(prev_chunk).ok(); + } + + for (a, b) in chunk.borrow_lines().iter().tuple_windows() { + line_idx += 1; + if compare_by(a, b, &settings) == Ordering::Greater { + if !settings.check_silent { + println!("sort: {}:{}: disorder: {}", path, line_idx, b.line); + } + return 1; + } + } + + prev_chunk = Some(chunk); + } + 0 +} + +/// The function running on the reader thread. +fn reader( + mut file: Box, + receiver: Receiver, + sender: SyncSender, + settings: &GlobalSettings, +) { + let mut sender = Some(sender); + let mut carry_over = vec![]; + for chunk in receiver.iter() { + let (recycled_lines, recycled_buffer) = chunk.recycle(); + chunks::read( + &mut sender, + recycled_buffer, + &mut carry_over, + &mut file, + &mut iter::empty(), + if settings.zero_terminated { + b'\0' + } else { + b'\n' + }, + recycled_lines, + settings, + ) + } +} diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs new file mode 100644 index 000000000..c679980ec --- /dev/null +++ b/src/uu/sort/src/chunks.rs @@ -0,0 +1,202 @@ +// * This file is part of the uutils coreutils package. +// * +// * (c) Michael Debertol +// * +// * For the full copyright and license information, please view the LICENSE +// * file that was distributed with this source code. + +//! Utilities for reading files as chunks. + +use std::{ + io::{ErrorKind, Read}, + sync::mpsc::SyncSender, +}; + +use memchr::memchr_iter; +use ouroboros::self_referencing; + +use crate::{GlobalSettings, Line}; + +/// The chunk that is passed around between threads. +/// `lines` consist of slices into `buffer`. +#[self_referencing(pub_extras)] +#[derive(Debug)] +pub struct Chunk { + pub buffer: Vec, + #[borrows(buffer)] + #[covariant] + pub lines: Vec>, +} + +impl Chunk { + /// Destroy this chunk and return its components to be reused. + /// + /// # Returns + /// + /// * The `lines` vector, emptied + /// * The `buffer` vector, **not** emptied + pub fn recycle(mut self) -> (Vec>, Vec) { + let recycled_lines = self.with_lines_mut(|lines| { + lines.clear(); + unsafe { + // SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime, + // because the vector is empty. + // Transmuting is necessary to make recycling possible. See https://github.com/rust-lang/rfcs/pull/2802 + // for a rfc to make this unnecessary. Its example is similar to the code here. + std::mem::transmute::>, Vec>>(std::mem::take(lines)) + } + }); + (recycled_lines, self.into_heads().buffer) + } +} + +/// Read a chunk, parse lines and send them. +/// +/// No empty chunk will be sent. +/// +/// # Arguments +/// +/// * `sender_option`: The sender to send the lines to the sorter. If `None`, does nothing. +/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. +/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) +/// * `carry_over`: The bytes that must be carried over in between invocations. +/// * `file`: The current file. +/// * `next_files`: What `file` should be updated to next. +/// * `separator`: The line separator. +/// * `lines`: The recycled vector to fill with lines. Must be empty. +/// * `settings`: The global settings. +#[allow(clippy::too_many_arguments)] +pub fn read( + sender_option: &mut Option>, + mut buffer: Vec, + carry_over: &mut Vec, + file: &mut Box, + next_files: &mut impl Iterator>, + separator: u8, + lines: Vec>, + settings: &GlobalSettings, +) { + assert!(lines.is_empty()); + if let Some(sender) = sender_option { + if buffer.len() < carry_over.len() { + buffer.resize(carry_over.len() + 10 * 1024, 0); + } + buffer[..carry_over.len()].copy_from_slice(&carry_over); + let (read, should_continue) = + read_to_buffer(file, next_files, &mut buffer, carry_over.len(), separator); + carry_over.clear(); + carry_over.extend_from_slice(&buffer[read..]); + + let payload = Chunk::new(buffer, |buf| { + let mut lines = unsafe { + // SAFETY: It is safe to transmute to a vector of lines with shorter lifetime, + // because it was only temporarily transmuted to a Vec> to make recycling possible. + std::mem::transmute::>, Vec>>(lines) + }; + let read = crash_if_err!(1, std::str::from_utf8(&buf[..read])); + parse_lines(read, &mut lines, separator, &settings); + lines + }); + if !payload.borrow_lines().is_empty() { + sender.send(payload).unwrap(); + } + if !should_continue { + *sender_option = None; + } + } +} + +/// Split `read` into `Line`s, and add them to `lines`. +fn parse_lines<'a>( + mut read: &'a str, + lines: &mut Vec>, + separator: u8, + settings: &GlobalSettings, +) { + // Strip a trailing separator. TODO: Once our MinRustV is 1.45 or above, use strip_suffix() instead. + if read.ends_with(separator as char) { + read = &read[..read.len() - 1]; + } + + lines.extend( + read.split(separator as char) + .map(|line| Line::create(line, settings)), + ); +} + +/// Read from `file` into `buffer`. +/// +/// This function makes sure that at least two lines are read (unless we reach EOF and there's no next file), +/// growing the buffer if necessary. +/// The last line is likely to not have been fully read into the buffer. Its bytes must be copied to +/// the front of the buffer for the next invocation so that it can be continued to be read +/// (see the return values and `start_offset`). +/// +/// # Arguments +/// +/// * `file`: The file to start reading from. +/// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`, +/// and this function continues reading. +/// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset` +/// as well). It will not be grown by default, unless that is necessary to read at least two lines. +/// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over +/// from the previous read and should not be overwritten. +/// * `separator`: The byte that separates lines. +/// +/// # Returns +/// +/// * The amount of bytes in `buffer` that can now be interpreted as lines. +/// The remaining bytes must be copied to the start of the buffer for the next invocation, +/// if another invocation is necessary, which is determined by the other return value. +/// * Whether this function should be called again. +fn read_to_buffer( + file: &mut Box, + next_files: &mut impl Iterator>, + buffer: &mut Vec, + start_offset: usize, + separator: u8, +) -> (usize, bool) { + let mut read_target = &mut buffer[start_offset..]; + loop { + match file.read(read_target) { + Ok(0) => { + if read_target.is_empty() { + // chunk is full + let mut sep_iter = memchr_iter(separator, &buffer).rev(); + let last_line_end = sep_iter.next(); + if sep_iter.next().is_some() { + // We read enough lines. + let end = last_line_end.unwrap(); + // We want to include the separator here, because it shouldn't be carried over. + return (end + 1, true); + } else { + // We need to read more lines + let len = buffer.len(); + // resize the vector to 10 KB more + buffer.resize(len + 1024 * 10, 0); + read_target = &mut buffer[len..]; + } + } else { + // This file is empty. + if let Some(next_file) = next_files.next() { + // There is another file. + *file = next_file; + } else { + // This was the last file. + let leftover_len = read_target.len(); + return (buffer.len() - leftover_len, false); + } + } + } + Ok(n) => { + read_target = &mut read_target[n..]; + } + Err(e) if e.kind() == ErrorKind::Interrupted => { + // retry + } + Err(e) => { + crash!(1, "{}", e) + } + } + } +} diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs new file mode 100644 index 000000000..629ebb714 --- /dev/null +++ b/src/uu/sort/src/ext_sort.rs @@ -0,0 +1,160 @@ +// * This file is part of the uutils coreutils package. +// * +// * (c) Michael Debertol +// * +// * For the full copyright and license information, please view the LICENSE +// * file that was distributed with this source code. + +//! Sort big files by using files for storing intermediate chunks. +//! +//! Files are read into chunks of memory which are then sorted individually and +//! written to temporary files. There are two threads: One sorter, and one reader/writer. +//! The buffers for the individual chunks are recycled. There are two buffers. + +use std::io::{BufWriter, Write}; +use std::path::Path; +use std::{ + fs::OpenOptions, + io::Read, + sync::mpsc::{Receiver, SyncSender}, + thread, +}; + +use tempdir::TempDir; + +use crate::{ + chunks::{self, Chunk}, + merge::{self, FileMerger}, + sort_by, GlobalSettings, +}; + +/// Iterator that wraps the +pub struct ExtSortedMerger<'a> { + pub file_merger: FileMerger<'a>, + // Keep _tmp_dir around, as it is deleted when dropped. + _tmp_dir: TempDir, +} + +/// Sort big files by using files for storing intermediate chunks. +/// +/// # Returns +/// +/// An iterator that merges intermediate files back together. +pub fn ext_sort<'a>( + files: &mut impl Iterator>, + settings: &'a GlobalSettings, +) -> ExtSortedMerger<'a> { + let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); + let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); + let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); + thread::spawn({ + let settings = settings.clone(); + move || sorter(recycled_receiver, sorted_sender, settings) + }); + let chunks_read = reader_writer( + files, + &tmp_dir, + if settings.zero_terminated { + b'\0' + } else { + b'\n' + }, + // Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly + // around settings.buffer_size as a whole. + settings.buffer_size / 10, + settings.clone(), + sorted_receiver, + recycled_sender, + ); + let files = (0..chunks_read) + .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) + .collect::>(); + + ExtSortedMerger { + file_merger: merge::merge(&files, settings), + _tmp_dir: tmp_dir, + } +} + +/// The function that is executed on the sorter thread. +fn sorter(receiver: Receiver, sender: SyncSender, settings: GlobalSettings) { + while let Ok(mut payload) = receiver.recv() { + payload.with_lines_mut(|lines| sort_by(lines, &settings)); + sender.send(payload).unwrap(); + } +} + +/// The function that is executed on the reader/writer thread. +/// +/// # Returns +/// * The number of chunks read. +fn reader_writer( + mut files: impl Iterator>, + tmp_dir: &TempDir, + separator: u8, + buffer_size: usize, + settings: GlobalSettings, + receiver: Receiver, + sender: SyncSender, +) -> usize { + let mut sender_option = Some(sender); + + let mut file = files.next().unwrap(); + + let mut carry_over = vec![]; + // kick things off with two reads + for _ in 0..2 { + chunks::read( + &mut sender_option, + vec![0; buffer_size], + &mut carry_over, + &mut file, + &mut files, + separator, + Vec::new(), + &settings, + ) + } + + let mut file_number = 0; + loop { + let mut chunk = match receiver.recv() { + Ok(it) => it, + _ => return file_number, + }; + + write( + &mut chunk, + &tmp_dir.path().join(file_number.to_string()), + separator, + ); + + let (recycled_lines, recycled_buffer) = chunk.recycle(); + + file_number += 1; + + chunks::read( + &mut sender_option, + recycled_buffer, + &mut carry_over, + &mut file, + &mut files, + separator, + recycled_lines, + &settings, + ); + } +} + +/// Write the lines in `chunk` to `file`, separated by `separator`. +fn write(chunk: &mut Chunk, file: &Path, separator: u8) { + chunk.with_lines_mut(|lines| { + // Write the lines to the file + let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file)); + let mut writer = BufWriter::new(file); + for s in lines.iter() { + crash_if_err!(1, writer.write_all(s.line.as_bytes())); + crash_if_err!(1, writer.write_all(&[separator])); + } + }); +} diff --git a/src/uu/sort/src/external_sort/LICENSE b/src/uu/sort/src/external_sort/LICENSE deleted file mode 100644 index e26c89c9f..000000000 --- a/src/uu/sort/src/external_sort/LICENSE +++ /dev/null @@ -1,19 +0,0 @@ -Copyright 2018 Battelle Memorial Institute - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/src/uu/sort/src/external_sort/mod.rs b/src/uu/sort/src/external_sort/mod.rs deleted file mode 100644 index af6902367..000000000 --- a/src/uu/sort/src/external_sort/mod.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::fs::OpenOptions; -use std::io::{BufWriter, Write}; -use std::path::Path; - -use tempdir::TempDir; - -use crate::{file_to_lines_iter, FileMerger}; - -use super::{GlobalSettings, Line}; - -/// Iterator that provides sorted `T`s -pub struct ExtSortedIterator<'a> { - file_merger: FileMerger<'a>, - // Keep tmp_dir around, it is deleted when dropped. - _tmp_dir: TempDir, -} - -impl<'a> Iterator for ExtSortedIterator<'a> { - type Item = Line; - fn next(&mut self) -> Option { - self.file_merger.next() - } -} - -/// Sort (based on `compare`) the `T`s provided by `unsorted` and return an -/// iterator -/// -/// # Panics -/// -/// This method can panic due to issues writing intermediate sorted chunks -/// to disk. -pub fn ext_sort( - unsorted: impl Iterator, - settings: &GlobalSettings, -) -> ExtSortedIterator { - let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort")); - - let mut total_read = 0; - let mut chunk = Vec::new(); - - let mut chunks_read = 0; - let mut file_merger = FileMerger::new(settings); - - // make the initial chunks on disk - for seq in unsorted { - let seq_size = seq.estimate_size(); - total_read += seq_size; - - chunk.push(seq); - - if total_read >= settings.buffer_size && chunk.len() >= 2 { - super::sort_by(&mut chunk, &settings); - - let file_path = tmp_dir.path().join(chunks_read.to_string()); - write_chunk(settings, &file_path, &mut chunk); - chunk.clear(); - total_read = 0; - chunks_read += 1; - - file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap())) - } - } - // write the last chunk - if !chunk.is_empty() { - super::sort_by(&mut chunk, &settings); - - let file_path = tmp_dir.path().join(chunks_read.to_string()); - write_chunk( - settings, - &tmp_dir.path().join(chunks_read.to_string()), - &mut chunk, - ); - - file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap())); - } - ExtSortedIterator { - file_merger, - _tmp_dir: tmp_dir, - } -} - -fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec) { - let new_file = crash_if_err!(1, OpenOptions::new().create(true).append(true).open(file)); - let mut buf_write = BufWriter::new(new_file); - for s in chunk { - crash_if_err!(1, buf_write.write_all(s.line.as_bytes())); - crash_if_err!( - 1, - buf_write.write_all(if settings.zero_terminated { "\0" } else { "\n" }.as_bytes(),) - ); - } - crash_if_err!(1, buf_write.flush()); -} diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs new file mode 100644 index 000000000..6f7cdfed7 --- /dev/null +++ b/src/uu/sort/src/merge.rs @@ -0,0 +1,223 @@ +//! Merge already sorted files. +//! +//! We achieve performance by splitting the tasks of sorting and writing, and reading and parsing between two threads. +//! The threads communicate over channels. There's one channel per file in the direction reader -> sorter, but only +//! one channel from the sorter back to the reader. The channels to the sorter are used to send the read chunks. +//! The sorter reads the next chunk from the channel whenever it needs the next chunk after running out of lines +//! from the previous read of the file. The channel back from the sorter to the reader has two purposes: To allow the reader +//! to reuse memory allocations and to tell the reader which file to read from next. + +use std::{ + cmp::Ordering, + ffi::OsStr, + io::{Read, Write}, + iter, + rc::Rc, + sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, + thread, +}; + +use compare::Compare; + +use crate::{ + chunks::{self, Chunk}, + compare_by, open, GlobalSettings, +}; + +// Merge already sorted files. +pub fn merge<'a>(files: &[impl AsRef], settings: &'a GlobalSettings) -> FileMerger<'a> { + let (request_sender, request_receiver) = channel(); + let mut reader_files = Vec::with_capacity(files.len()); + let mut loaded_receivers = Vec::with_capacity(files.len()); + for (file_number, file) in files.iter().filter_map(open).enumerate() { + let (sender, receiver) = sync_channel(2); + loaded_receivers.push(receiver); + reader_files.push(ReaderFile { + file, + sender: Some(sender), + carry_over: vec![], + }); + request_sender + .send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) + .unwrap(); + } + + for file_number in 0..reader_files.len() { + request_sender + .send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) + .unwrap(); + } + + thread::spawn({ + let settings = settings.clone(); + move || { + reader( + request_receiver, + &mut reader_files, + &settings, + if settings.zero_terminated { + b'\0' + } else { + b'\n' + }, + ) + } + }); + + let mut mergeable_files = vec![]; + + for (file_number, receiver) in loaded_receivers.into_iter().enumerate() { + mergeable_files.push(MergeableFile { + current_chunk: Rc::new(receiver.recv().unwrap()), + file_number, + line_idx: 0, + receiver, + }) + } + + FileMerger { + heap: binary_heap_plus::BinaryHeap::from_vec_cmp( + mergeable_files, + FileComparator { settings }, + ), + request_sender, + prev: None, + } +} +/// The struct on the reader thread representing an input file +struct ReaderFile { + file: Box, + sender: Option>, + carry_over: Vec, +} + +/// The function running on the reader thread. +fn reader( + recycled_receiver: Receiver<(usize, Chunk)>, + files: &mut [ReaderFile], + settings: &GlobalSettings, + separator: u8, +) { + for (file_idx, chunk) in recycled_receiver.iter() { + let (recycled_lines, recycled_buffer) = chunk.recycle(); + let ReaderFile { + file, + sender, + carry_over, + } = &mut files[file_idx]; + chunks::read( + sender, + recycled_buffer, + carry_over, + file, + &mut iter::empty(), + separator, + recycled_lines, + settings, + ); + } +} +/// The struct on the main thread representing an input file +pub struct MergeableFile { + current_chunk: Rc, + line_idx: usize, + receiver: Receiver, + file_number: usize, +} + +/// A struct to keep track of the previous line we encountered. +/// +/// This is required for deduplication purposes. +struct PreviousLine { + chunk: Rc, + line_idx: usize, + file_number: usize, +} + +/// Merges files together. This is **not** an iterator because of lifetime problems. +pub struct FileMerger<'a> { + heap: binary_heap_plus::BinaryHeap>, + request_sender: Sender<(usize, Chunk)>, + prev: Option, +} + +impl<'a> FileMerger<'a> { + /// Write the merged contents to the output file. + pub fn write_all(&mut self, settings: &GlobalSettings) { + let mut out = settings.out_writer(); + while self.write_next(settings, &mut out) {} + } + + fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool { + if let Some(file) = self.heap.peek() { + let prev = self.prev.replace(PreviousLine { + chunk: file.current_chunk.clone(), + line_idx: file.line_idx, + file_number: file.file_number, + }); + + file.current_chunk.with_lines(|lines| { + let current_line = &lines[file.line_idx]; + if settings.unique { + if let Some(prev) = &prev { + let cmp = compare_by( + &prev.chunk.borrow_lines()[prev.line_idx], + current_line, + settings, + ); + if cmp == Ordering::Equal { + return; + } + } + } + current_line.print(out, settings); + }); + + let was_last_line_for_file = + file.current_chunk.borrow_lines().len() == file.line_idx + 1; + + if was_last_line_for_file { + if let Ok(next_chunk) = file.receiver.recv() { + let mut file = self.heap.peek_mut().unwrap(); + file.current_chunk = Rc::new(next_chunk); + file.line_idx = 0; + } else { + self.heap.pop(); + } + } else { + self.heap.peek_mut().unwrap().line_idx += 1; + } + + if let Some(prev) = prev { + if let Ok(prev_chunk) = Rc::try_unwrap(prev.chunk) { + self.request_sender + .send((prev.file_number, prev_chunk)) + .ok(); + } + } + } + !self.heap.is_empty() + } +} + +/// Compares files by their current line. +struct FileComparator<'a> { + settings: &'a GlobalSettings, +} + +impl<'a> Compare for FileComparator<'a> { + fn compare(&self, a: &MergeableFile, b: &MergeableFile) -> Ordering { + let mut cmp = compare_by( + &a.current_chunk.borrow_lines()[a.line_idx], + &b.current_chunk.borrow_lines()[b.line_idx], + self.settings, + ); + if cmp == Ordering::Equal { + // To make sorting stable, we need to consider the file number as well, + // as lines from a file with a lower number are to be considered "earlier". + cmp = a.file_number.cmp(&b.file_number); + } + // Our BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering. + cmp.reverse() + } +} diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 2697d7df4..b6ab5a2b1 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -15,13 +15,16 @@ #[macro_use] extern crate uucore; +mod check; +mod chunks; mod custom_str_cmp; -mod external_sort; +mod ext_sort; +mod merge; mod numeric_str_cmp; use clap::{App, Arg}; use custom_str_cmp::custom_str_cmp; -use external_sort::ext_sort; +use ext_sort::ext_sort; use fnv::FnvHasher; use itertools::Itertools; use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings}; @@ -30,18 +33,15 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use semver::Version; use std::cmp::Ordering; -use std::collections::BinaryHeap; use std::env; use std::ffi::OsStr; use std::fs::File; use std::hash::{Hash, Hasher}; use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Write}; -use std::mem::replace; use std::ops::Range; use std::path::Path; use std::path::PathBuf; use unicode_width::UnicodeWidthStr; -use uucore::fs::is_stdin_interactive; // for Iterator::dedup() use uucore::InvalidEncodingHandling; static NAME: &str = "sort"; @@ -150,6 +150,19 @@ impl GlobalSettings { }; num_usize * suf_usize } + + fn out_writer(&self) -> BufWriter> { + match self.outfile { + Some(ref filename) => match File::create(Path::new(&filename)) { + Ok(f) => BufWriter::new(Box::new(f) as Box), + Err(e) => { + show_error!("{0}: {1}", filename, e.to_string()); + panic!("Could not open output file"); + } + }, + None => BufWriter::new(Box::new(stdout()) as Box), + } + } } impl Default for GlobalSettings { @@ -205,29 +218,7 @@ impl From<&GlobalSettings> for KeySettings { } } -#[derive(Debug, Clone)] -/// Represents the string selected by a FieldSelector. -struct SelectionRange { - range: Range, -} - -impl SelectionRange { - fn new(range: Range) -> Self { - Self { range } - } - - /// Gets the actual string slice represented by this Selection. - fn get_str<'a>(&self, line: &'a str) -> &'a str { - &line[self.range.to_owned()] - } - - fn shorten(&mut self, new_range: Range) { - self.range.end = self.range.start + new_range.end; - self.range.start += new_range.start; - } -} - -#[derive(Clone)] +#[derive(Clone, Debug)] enum NumCache { AsF64(GeneralF64ParseResult), WithInfo(NumInfo), @@ -248,64 +239,53 @@ impl NumCache { } } -#[derive(Clone)] -struct Selection { - range: SelectionRange, +#[derive(Clone, Debug)] +struct Selection<'a> { + slice: &'a str, num_cache: Option>, } -impl Selection { - /// Gets the actual string slice represented by this Selection. - fn get_str<'a>(&'a self, line: &'a Line) -> &'a str { - self.range.get_str(&line.line) - } -} - type Field = Range; -#[derive(Clone)] -pub struct Line { - line: Box, - // The common case is not to specify fields. Let's make this fast. - first_selection: Selection, - other_selections: Box<[Selection]>, +#[derive(Clone, Debug)] +pub struct Line<'a> { + line: &'a str, + selections: Box<[Selection<'a>]>, } -impl Line { - /// Estimate the number of bytes that this Line is occupying - pub fn estimate_size(&self) -> usize { - self.line.len() - + self.other_selections.len() * std::mem::size_of::() - + std::mem::size_of::() - } - - pub fn new(line: String, settings: &GlobalSettings) -> Self { +impl<'a> Line<'a> { + fn create(string: &'a str, settings: &GlobalSettings) -> Self { let fields = if settings .selectors .iter() - .any(|selector| selector.needs_tokens()) + .any(|selector| selector.needs_tokens) { // Only tokenize if we will need tokens. - Some(tokenize(&line, settings.separator)) + Some(tokenize(string, settings.separator)) } else { None }; - let mut selectors = settings.selectors.iter(); + Line { + line: string, + selections: settings + .selectors + .iter() + .filter(|selector| !selector.is_default_selection) + .map(|selector| selector.get_selection(string, fields.as_deref())) + .collect(), + } + } - let first_selection = selectors - .next() - .unwrap() - .get_selection(&line, fields.as_deref()); - - let other_selections: Vec = selectors - .map(|selector| selector.get_selection(&line, fields.as_deref())) - .collect(); - - Self { - line: line.into_boxed_str(), - first_selection, - other_selections: other_selections.into_boxed_slice(), + fn print(&self, writer: &mut impl Write, settings: &GlobalSettings) { + if settings.zero_terminated && !settings.debug { + crash_if_err!(1, writer.write_all(self.line.as_bytes())); + crash_if_err!(1, writer.write_all("\0".as_bytes())); + } else if !settings.debug { + crash_if_err!(1, writer.write_all(self.line.as_bytes())); + crash_if_err!(1, writer.write_all("\n".as_bytes())); + } else { + crash_if_err!(1, self.print_debug(settings, writer)); } } @@ -314,7 +294,7 @@ impl Line { fn print_debug( &self, settings: &GlobalSettings, - writer: &mut dyn Write, + writer: &mut impl Write, ) -> std::io::Result<()> { // We do not consider this function performance critical, as debug output is only useful for small files, // which are not a performance problem in any case. Therefore there aren't any special performance @@ -575,23 +555,39 @@ struct FieldSelector { from: KeyPosition, to: Option, settings: KeySettings, + needs_tokens: bool, + // Whether the selection for each line is going to be the whole line with no NumCache + is_default_selection: bool, } impl FieldSelector { - fn needs_tokens(&self) -> bool { - self.from.field != 1 || self.from.char == 0 || self.to.is_some() + fn new(from: KeyPosition, to: Option, settings: KeySettings) -> Self { + Self { + is_default_selection: from.field == 1 + && from.char == 1 + && to.is_none() + // TODO: Once our MinRustV is 1.42 or higher, change this to the matches! macro + && match settings.mode { + SortMode::Numeric | SortMode::GeneralNumeric | SortMode::HumanNumeric => false, + _ => true, + }, + needs_tokens: from.field != 1 || from.char == 0 || to.is_some(), + from, + to, + settings, + } } /// Get the selection that corresponds to this selector for the line. /// If needs_fields returned false, tokens may be None. - fn get_selection(&self, line: &str, tokens: Option<&[Field]>) -> Selection { - let mut range = SelectionRange::new(self.get_range(&line, tokens)); + fn get_selection<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Selection<'a> { + let mut range = &line[self.get_range(&line, tokens)]; let num_cache = if self.settings.mode == SortMode::Numeric || self.settings.mode == SortMode::HumanNumeric { // Parse NumInfo for this number. let (info, num_range) = NumInfo::parse( - range.get_str(&line), + range, NumInfoParseSettings { accept_si_units: self.settings.mode == SortMode::HumanNumeric, thousands_separator: Some(THOUSANDS_SEP), @@ -599,19 +595,21 @@ impl FieldSelector { }, ); // Shorten the range to what we need to pass to numeric_str_cmp later. - range.shorten(num_range); + range = &range[num_range]; Some(Box::new(NumCache::WithInfo(info))) } else if self.settings.mode == SortMode::GeneralNumeric { // Parse this number as f64, as this is the requirement for general numeric sorting. - let str = range.get_str(&line); Some(Box::new(NumCache::AsF64(general_f64_parse( - &str[get_leading_gen(str)], + &range[get_leading_gen(range)], )))) } else { // This is not a numeric sort, so we don't need a NumCache. None }; - Selection { range, num_cache } + Selection { + slice: range, + num_cache, + } } /// Look up the range in the line that corresponds to this selector. @@ -701,91 +699,6 @@ impl FieldSelector { } } -struct MergeableFile<'a> { - lines: Box + 'a>, - current_line: Line, - settings: &'a GlobalSettings, - file_index: usize, -} - -// BinaryHeap depends on `Ord`. Note that we want to pop smallest items -// from the heap first, and BinaryHeap.pop() returns the largest, so we -// trick it into the right order by calling reverse() here. -impl<'a> Ord for MergeableFile<'a> { - fn cmp(&self, other: &MergeableFile) -> Ordering { - let comparison = compare_by(&self.current_line, &other.current_line, self.settings); - if comparison == Ordering::Equal { - // If lines are equal, the earlier file takes precedence. - self.file_index.cmp(&other.file_index) - } else { - comparison - } - .reverse() - } -} - -impl<'a> PartialOrd for MergeableFile<'a> { - fn partial_cmp(&self, other: &MergeableFile) -> Option { - Some(self.cmp(other)) - } -} - -impl<'a> PartialEq for MergeableFile<'a> { - fn eq(&self, other: &MergeableFile) -> bool { - Ordering::Equal == self.cmp(other) - } -} - -impl<'a> Eq for MergeableFile<'a> {} - -struct FileMerger<'a> { - heap: BinaryHeap>, - settings: &'a GlobalSettings, -} - -impl<'a> FileMerger<'a> { - fn new(settings: &'a GlobalSettings) -> FileMerger<'a> { - FileMerger { - heap: BinaryHeap::new(), - settings, - } - } - fn push_file(&mut self, mut lines: Box + 'a>) { - if let Some(next_line) = lines.next() { - let mergeable_file = MergeableFile { - lines, - current_line: next_line, - settings: &self.settings, - file_index: self.heap.len(), - }; - self.heap.push(mergeable_file); - } - } -} - -impl<'a> Iterator for FileMerger<'a> { - type Item = Line; - fn next(&mut self) -> Option { - match self.heap.pop() { - Some(mut current) => { - match current.lines.next() { - Some(next_line) => { - let ret = replace(&mut current.current_line, next_line); - self.heap.push(current); - Some(ret) - } - _ => { - // Don't put it back in the heap (it's empty/erroring) - // but its first line is still valid. - Some(current.current_line) - } - } - } - None => None, - } - } -} - fn get_usage() -> String { format!( "{0} {1} @@ -985,7 +898,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { let mut files = Vec::new(); for path in &files0_from { - let (reader, _) = open(path.as_str()).expect("Could not read from file specified."); + let reader = open(path.as_str()).expect("Could not read from file specified."); let buf_reader = BufReader::new(reader); for line in buf_reader.split(b'\0').flatten() { files.push( @@ -1112,11 +1025,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { let to = from_to .next() .map(|to| KeyPosition::parse(to, 0, &mut key_settings)); - let field_selector = FieldSelector { - from, - to, - settings: key_settings, - }; + let field_selector = FieldSelector::new(from, to, key_settings); settings.selectors.push(field_selector); } } @@ -1124,48 +1033,21 @@ pub fn uumain(args: impl uucore::Args) -> i32 { if !settings.stable || !matches.is_present(OPT_KEY) { // add a default selector matching the whole line let key_settings = KeySettings::from(&settings); - settings.selectors.push(FieldSelector { - from: KeyPosition { + settings.selectors.push(FieldSelector::new( + KeyPosition { field: 1, char: 1, ignore_blanks: key_settings.ignore_blanks, }, - to: None, - settings: key_settings, - }); + None, + key_settings, + )); } - exec(files, settings) + exec(&files, &settings) } -fn file_to_lines_iter( - file: impl AsRef, - settings: &'_ GlobalSettings, -) -> Option + '_> { - let (reader, _) = match open(file) { - Some(x) => x, - None => return None, - }; - - let buf_reader = BufReader::new(reader); - - Some( - buf_reader - .split(if settings.zero_terminated { - b'\0' - } else { - b'\n' - }) - .map(move |line| { - Line::new( - crash_if_err!(1, String::from_utf8(crash_if_err!(1, line))), - settings, - ) - }), - ) -} - -fn output_sorted_lines(iter: impl Iterator, settings: &GlobalSettings) { +fn output_sorted_lines<'a>(iter: impl Iterator>, settings: &GlobalSettings) { if settings.unique { print_sorted( iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal), @@ -1176,87 +1058,48 @@ fn output_sorted_lines(iter: impl Iterator, settings: &GlobalSettin } } -fn exec(files: Vec, settings: GlobalSettings) -> i32 { +fn exec(files: &[String], settings: &GlobalSettings) -> i32 { if settings.merge { - let mut file_merger = FileMerger::new(&settings); - for lines in files - .iter() - .filter_map(|file| file_to_lines_iter(file, &settings)) - { - file_merger.push_file(Box::new(lines)); + let mut file_merger = merge::merge(files, settings); + file_merger.write_all(settings); + } else if settings.check { + if files.len() > 1 { + crash!(1, "only one file allowed with -c"); } - output_sorted_lines(file_merger, &settings); + return check::check(files.first().unwrap(), settings); + } else if settings.ext_sort { + let mut lines = files.iter().filter_map(open); + + let mut sorted = ext_sort(&mut lines, &settings); + sorted.file_merger.write_all(settings); } else { - let lines = files - .iter() - .filter_map(|file| file_to_lines_iter(file, &settings)) - .flatten(); + let separator = if settings.zero_terminated { '\0' } else { '\n' }; + let mut lines = vec![]; + let mut full_string = String::new(); - if settings.check { - return exec_check_file(lines, &settings); - } + for mut file in files.iter().filter_map(open) { + crash_if_err!(1, file.read_to_string(&mut full_string)); - // Only use ext_sorter when we need to. - // Probably faster that we don't create - // an owned value each run - if settings.ext_sort { - let sorted_lines = ext_sort(lines, &settings); - output_sorted_lines(sorted_lines, &settings); - } else { - let mut lines = vec![]; - - // This is duplicated from fn file_to_lines_iter, but using that function directly results in a performance regression. - for (file, _) in files.iter().map(open).flatten() { - let buf_reader = BufReader::new(file); - for line in buf_reader.split(if settings.zero_terminated { - b'\0' - } else { - b'\n' - }) { - let string = crash_if_err!(1, String::from_utf8(crash_if_err!(1, line))); - lines.push(Line::new(string, &settings)); - } + if !full_string.ends_with(separator) { + full_string.push(separator); } - - sort_by(&mut lines, &settings); - output_sorted_lines(lines.into_iter(), &settings); } - } + if full_string.ends_with(separator) { + full_string.pop(); + } + + for line in full_string.split(if settings.zero_terminated { '\0' } else { '\n' }) { + lines.push(Line::create(line, &settings)); + } + + sort_by(&mut lines, &settings); + output_sorted_lines(lines.into_iter(), &settings); + } 0 } -fn exec_check_file(unwrapped_lines: impl Iterator, settings: &GlobalSettings) -> i32 { - // errors yields the line before each disorder, - // plus the last line (quirk of .coalesce()) - let mut errors = unwrapped_lines - .enumerate() - .coalesce(|(last_i, last_line), (i, line)| { - if compare_by(&last_line, &line, &settings) == Ordering::Greater { - Err(((last_i, last_line), (i, line))) - } else { - Ok((i, line)) - } - }); - if let Some((first_error_index, _line)) = errors.next() { - // Check for a second "error", as .coalesce() always returns the last - // line, no matter what our merging function does. - if let Some(_last_line_or_next_error) = errors.next() { - if !settings.check_silent { - println!("sort: disorder in line {}", first_error_index); - }; - 1 - } else { - // first "error" was actually the last line. - 0 - } - } else { - // unwrapped_lines was empty. Empty files are defined to be sorted. - 0 - } -} - -fn sort_by(unsorted: &mut Vec, settings: &GlobalSettings) { +fn sort_by<'a>(unsorted: &mut Vec>, settings: &GlobalSettings) { if settings.stable || settings.unique { unsorted.par_sort_by(|a, b| compare_by(a, b, &settings)) } else { @@ -1264,19 +1107,39 @@ fn sort_by(unsorted: &mut Vec, settings: &GlobalSettings) { } } -fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering { - for (idx, selector) in global_settings.selectors.iter().enumerate() { - let (a_selection, b_selection) = if idx == 0 { - (&a.first_selection, &b.first_selection) +fn compare_by<'a>(a: &Line<'a>, b: &Line<'a>, global_settings: &GlobalSettings) -> Ordering { + let mut idx = 0; + for selector in &global_settings.selectors { + let mut _selections = None; + let (a_selection, b_selection) = if selector.is_default_selection { + // We can select the whole line. + // We have to store the selections outside of the if-block so that they live long enough. + _selections = Some(( + Selection { + slice: a.line, + num_cache: None, + }, + Selection { + slice: b.line, + num_cache: None, + }, + )); + // Unwrap the selections again, and return references to them. + ( + &_selections.as_ref().unwrap().0, + &_selections.as_ref().unwrap().1, + ) } else { - (&a.other_selections[idx - 1], &b.other_selections[idx - 1]) + let selections = (&a.selections[idx], &b.selections[idx]); + idx += 1; + selections }; - let a_str = a_selection.get_str(a); - let b_str = b_selection.get_str(b); + let a_str = a_selection.slice; + let b_str = b_selection.slice; let settings = &selector.settings; let cmp: Ordering = if settings.random { - random_shuffle(a_str, b_str, global_settings.salt.clone()) + random_shuffle(a_str, b_str, &global_settings.salt) } else { match settings.mode { SortMode::Numeric | SortMode::HumanNumeric => numeric_str_cmp( @@ -1307,7 +1170,7 @@ fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering let cmp = if global_settings.random || global_settings.stable || global_settings.unique { Ordering::Equal } else { - a.line.cmp(&b.line) + a.line.cmp(b.line) }; if global_settings.reverse { @@ -1362,7 +1225,7 @@ fn get_leading_gen(input: &str) -> Range { leading_whitespace_len..input.len() } -#[derive(Copy, Clone, PartialEq, PartialOrd)] +#[derive(Copy, Clone, PartialEq, PartialOrd, Debug)] enum GeneralF64ParseResult { Invalid, NaN, @@ -1408,12 +1271,11 @@ fn get_hash(t: &T) -> u64 { s.finish() } -fn random_shuffle(a: &str, b: &str, x: String) -> Ordering { +fn random_shuffle(a: &str, b: &str, salt: &str) -> Ordering { #![allow(clippy::comparison_chain)] - let salt_slice = x.as_str(); - let da = get_hash(&[a, salt_slice].concat()); - let db = get_hash(&[b, salt_slice].concat()); + let da = get_hash(&[a, salt].concat()); + let db = get_hash(&[b, salt].concat()); da.cmp(&db) } @@ -1504,45 +1366,23 @@ fn version_compare(a: &str, b: &str) -> Ordering { } } -fn print_sorted>(iter: T, settings: &GlobalSettings) { - let mut file: Box = match settings.outfile { - Some(ref filename) => match File::create(Path::new(&filename)) { - Ok(f) => Box::new(BufWriter::new(f)) as Box, - Err(e) => { - show_error!("{0}: {1}", filename, e.to_string()); - panic!("Could not open output file"); - } - }, - None => Box::new(BufWriter::new(stdout())) as Box, - }; - if settings.zero_terminated && !settings.debug { - for line in iter { - crash_if_err!(1, file.write_all(line.line.as_bytes())); - crash_if_err!(1, file.write_all("\0".as_bytes())); - } - } else { - for line in iter { - if !settings.debug { - crash_if_err!(1, file.write_all(line.line.as_bytes())); - crash_if_err!(1, file.write_all("\n".as_bytes())); - } else { - crash_if_err!(1, line.print_debug(settings, &mut file)); - } - } +fn print_sorted<'a, T: Iterator>>(iter: T, settings: &GlobalSettings) { + let mut writer = settings.out_writer(); + for line in iter { + line.print(&mut writer, settings); } - crash_if_err!(1, file.flush()); } // from cat.rs -fn open(path: impl AsRef) -> Option<(Box, bool)> { +fn open(path: impl AsRef) -> Option> { let path = path.as_ref(); if path == "-" { let stdin = stdin(); - return Some((Box::new(stdin) as Box, is_stdin_interactive())); + return Some(Box::new(stdin) as Box); } match File::open(Path::new(path)) { - Ok(f) => Some((Box::new(f) as Box, false)), + Ok(f) => Some(Box::new(f) as Box), Err(e) => { show_error!("{0:?}: {1}", path, e.to_string()); None @@ -1568,7 +1408,7 @@ mod tests { let b = "Ted"; let c = get_rand_string(); - assert_eq!(Ordering::Equal, random_shuffle(a, b, c)); + assert_eq!(Ordering::Equal, random_shuffle(a, b, &c)); } #[test] @@ -1592,7 +1432,7 @@ mod tests { let b = "9"; let c = get_rand_string(); - assert_eq!(Ordering::Equal, random_shuffle(a, b, c)); + assert_eq!(Ordering::Equal, random_shuffle(a, b, &c)); } #[test] @@ -1631,10 +1471,12 @@ mod tests { fn test_line_size() { // We should make sure to not regress the size of the Line struct because // it is unconditional overhead for every line we sort. - assert_eq!(std::mem::size_of::(), 56); + assert_eq!(std::mem::size_of::(), 32); // These are the fields of Line: - assert_eq!(std::mem::size_of::>(), 16); - assert_eq!(std::mem::size_of::(), 24); + assert_eq!(std::mem::size_of::<&str>(), 16); assert_eq!(std::mem::size_of::>(), 16); + + // How big is a selection? Constant cost all lines pay when we need selections. + assert_eq!(std::mem::size_of::(), 24); } } diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index bad9d577e..e89d18054 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -122,7 +122,7 @@ fn test_check_zero_terminated_failure() { .arg("-c") .arg("zero-terminated.txt") .fails() - .stdout_is("sort: disorder in line 0\n"); + .stdout_is("sort: zero-terminated.txt:2: disorder: ../../fixtures/du\n"); } #[test] @@ -621,7 +621,7 @@ fn test_check() { .arg("-c") .arg("check_fail.txt") .fails() - .stdout_is("sort: disorder in line 4\n"); + .stdout_is("sort: check_fail.txt:6: disorder: 5\n"); new_ucmd!() .arg("-c")