diff --git a/Cargo.lock b/Cargo.lock index 9c3aae831..43d491cef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,12 +43,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "array-init" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6945cc5422176fc5e602e590c2878d2c2acd9a4fe20a4baa7c28022521698ec6" - [[package]] name = "arrayref" version = "0.3.6" @@ -710,9 +704,9 @@ checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" [[package]] name = "heck" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" dependencies = [ "unicode-segmentation", ] @@ -1393,12 +1387,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", -] +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "regex-syntax" @@ -1511,9 +1502,9 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" dependencies = [ "libc", ] diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 23567833b..dde6febd3 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -102,17 +102,17 @@ pub fn read( carry_over.clear(); carry_over.extend_from_slice(&buffer[read..]); - let payload = Chunk::new(buffer, |buf| { - let mut lines = unsafe { - // SAFETY: It is safe to transmute to a vector of lines with shorter lifetime, - // because it was only temporarily transmuted to a Vec> to make recycling possible. - std::mem::transmute::>, Vec>>(lines) - }; - let read = crash_if_err!(1, std::str::from_utf8(&buf[..read])); - parse_lines(read, &mut lines, separator, &settings); - lines - }); - if !payload.borrow_lines().is_empty() { + if read != 0 { + let payload = Chunk::new(buffer, |buf| { + let mut lines = unsafe { + // SAFETY: It is safe to transmute to a vector of lines with shorter lifetime, + // because it was only temporarily transmuted to a Vec> to make recycling possible. + std::mem::transmute::>, Vec>>(lines) + }; + let read = crash_if_err!(1, std::str::from_utf8(&buf[..read])); + parse_lines(read, &mut lines, separator, &settings); + lines + }); sender.send(payload).unwrap(); } if !should_continue { @@ -175,6 +175,7 @@ fn read_to_buffer( separator: u8, ) -> (usize, bool) { let mut read_target = &mut buffer[start_offset..]; + let mut last_file_target_size = read_target.len(); loop { match file.read(read_target) { Ok(0) => { @@ -208,14 +209,27 @@ fn read_to_buffer( read_target = &mut buffer[len..]; } } else { - // This file is empty. + // This file has been fully read. + let mut leftover_len = read_target.len(); + if last_file_target_size != leftover_len { + // The file was not empty. + let read_len = buffer.len() - leftover_len; + if buffer[read_len - 1] != separator { + // The file did not end with a separator. We have to insert one. + buffer[read_len] = separator; + leftover_len -= 1; + } + let read_len = buffer.len() - leftover_len; + read_target = &mut buffer[read_len..]; + } if let Some(next_file) = next_files.next() { // There is another file. + last_file_target_size = leftover_len; *file = next_file; } else { // This was the last file. - let leftover_len = read_target.len(); - return (buffer.len() - leftover_len, false); + let read_len = buffer.len() - leftover_len; + return (read_len, false); } } } diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 9b1845efa..c439adcdc 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -12,8 +12,12 @@ //! The buffers for the individual chunks are recycled. There are two buffers. use std::cmp::Ordering; +use std::fs::File; +use std::io::BufReader; use std::io::{BufWriter, Write}; use std::path::Path; +use std::process::Child; +use std::process::{Command, Stdio}; use std::{ fs::OpenOptions, io::Read, @@ -25,12 +29,13 @@ use itertools::Itertools; use tempfile::TempDir; +use crate::Line; use crate::{ chunks::{self, Chunk}, compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, }; -const MIN_BUFFER_SIZE: usize = 8_000; +const START_BUFFER_SIZE: usize = 8_000; /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. pub fn ext_sort(files: &mut impl Iterator>, settings: &GlobalSettings) { @@ -63,10 +68,31 @@ pub fn ext_sort(files: &mut impl Iterator>, settings ); match read_result { ReadResult::WroteChunksToFile { chunks_written } => { - let files = (0..chunks_written) - .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) - .collect::>(); - let mut merger = merge::merge(&files, settings); + let mut children = Vec::new(); + let files = (0..chunks_written).map(|chunk_num| { + let file_path = tmp_dir.path().join(chunk_num.to_string()); + let file = File::open(file_path).unwrap(); + if let Some(compress_prog) = &settings.compress_prog { + let mut command = Command::new(compress_prog); + command.stdin(file).stdout(Stdio::piped()).arg("-d"); + let mut child = crash_if_err!( + 2, + command.spawn().map_err(|err| format!( + "couldn't execute compress program: errno {}", + err.raw_os_error().unwrap() + )) + ); + let child_stdout = child.stdout.take().unwrap(); + children.push(child); + Box::new(BufReader::new(child_stdout)) as Box + } else { + Box::new(BufReader::new(file)) as Box + } + }); + let mut merger = merge::merge_with_file_limit(files, settings); + for child in children { + assert_child_success(child, settings.compress_prog.as_ref().unwrap()); + } merger.write_all(settings); } ReadResult::SortedSingleChunk(chunk) => { @@ -132,7 +158,14 @@ fn reader_writer( for _ in 0..2 { chunks::read( &mut sender_option, - vec![0; MIN_BUFFER_SIZE], + vec![ + 0; + if START_BUFFER_SIZE < buffer_size { + START_BUFFER_SIZE + } else { + buffer_size + } + ], Some(buffer_size), &mut carry_over, &mut file, @@ -171,6 +204,7 @@ fn reader_writer( write( &mut chunk, &tmp_dir.path().join(file_number.to_string()), + settings.compress_prog.as_deref(), separator, ); @@ -193,14 +227,45 @@ fn reader_writer( } /// Write the lines in `chunk` to `file`, separated by `separator`. -fn write(chunk: &mut Chunk, file: &Path, separator: u8) { +/// `compress_prog` is used to optionally compress file contents. +fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) { chunk.with_lines_mut(|lines| { // Write the lines to the file let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file)); - let mut writer = BufWriter::new(file); - for s in lines.iter() { - crash_if_err!(1, writer.write_all(s.line.as_bytes())); - crash_if_err!(1, writer.write_all(&[separator])); - } + if let Some(compress_prog) = compress_prog { + let mut command = Command::new(compress_prog); + command.stdin(Stdio::piped()).stdout(file); + let mut child = crash_if_err!( + 2, + command.spawn().map_err(|err| format!( + "couldn't execute compress program: errno {}", + err.raw_os_error().unwrap() + )) + ); + let mut writer = BufWriter::new(child.stdin.take().unwrap()); + write_lines(lines, &mut writer, separator); + writer.flush().unwrap(); + drop(writer); + assert_child_success(child, compress_prog); + } else { + let mut writer = BufWriter::new(file); + write_lines(lines, &mut writer, separator); + }; }); } + +fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) { + for s in lines { + crash_if_err!(1, writer.write_all(s.line.as_bytes())); + crash_if_err!(1, writer.write_all(&[separator])); + } +} + +fn assert_child_success(mut child: Child, program: &str) { + if !matches!( + child.wait().map(|e| e.code()), + Ok(Some(0)) | Ok(None) | Err(_) + ) { + crash!(2, "'{}' terminated abnormally", program) + } +} diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 696353829..478b454b6 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -9,8 +9,8 @@ use std::{ cmp::Ordering, - ffi::OsStr, - io::{Read, Write}, + fs::File, + io::{BufWriter, Read, Write}, iter, rc::Rc, sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, @@ -18,18 +18,69 @@ use std::{ }; use compare::Compare; +use itertools::Itertools; use crate::{ chunks::{self, Chunk}, - compare_by, open, GlobalSettings, + compare_by, GlobalSettings, }; // Merge already sorted files. -pub fn merge<'a>(files: &[impl AsRef], settings: &'a GlobalSettings) -> FileMerger<'a> { +pub fn merge_with_file_limit>>( + files: F, + settings: &GlobalSettings, +) -> FileMerger { + if files.len() > settings.merge_batch_size { + let tmp_dir = tempfile::Builder::new() + .prefix("uutils_sort") + .tempdir_in(&settings.tmp_dir) + .unwrap(); + let mut batch_number = 0; + let mut remaining_files = files.len(); + let batches = files.chunks(settings.merge_batch_size); + let mut batches = batches.into_iter(); + while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 { + remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); + let mut merger = merge_without_limit(batches.next().unwrap(), settings); + let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap(); + merger.write_all_to(settings, &mut BufWriter::new(tmp_file)); + batch_number += 1; + } + let batch_files = (0..batch_number).map(|n| { + Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap()) + as Box + }); + if batch_number > settings.merge_batch_size { + assert!(batches.next().is_none()); + merge_with_file_limit( + Box::new(batch_files) as Box>>, + settings, + ) + } else { + let final_batch = batches.next(); + assert!(batches.next().is_none()); + merge_without_limit( + batch_files.chain(final_batch.into_iter().flatten()), + settings, + ) + } + } else { + merge_without_limit(files, settings) + } +} + +/// Merge files without limiting how many files are concurrently open +/// +/// It is the responsibility of the caller to ensure that `files` yields only +/// as many files as we are allowed to open concurrently. +fn merge_without_limit>>( + files: F, + settings: &GlobalSettings, +) -> FileMerger { let (request_sender, request_receiver) = channel(); - let mut reader_files = Vec::with_capacity(files.len()); - let mut loaded_receivers = Vec::with_capacity(files.len()); - for (file_number, file) in files.iter().map(open).enumerate() { + let mut reader_files = Vec::with_capacity(files.size_hint().0); + let mut loaded_receivers = Vec::with_capacity(files.size_hint().0); + for (file_number, file) in files.enumerate() { let (sender, receiver) = sync_channel(2); loaded_receivers.push(receiver); reader_files.push(ReaderFile { @@ -146,7 +197,11 @@ impl<'a> FileMerger<'a> { /// Write the merged contents to the output file. pub fn write_all(&mut self, settings: &GlobalSettings) { let mut out = settings.out_writer(); - while self.write_next(settings, &mut out) {} + self.write_all_to(settings, &mut out); + } + + pub fn write_all_to(&mut self, settings: &GlobalSettings, out: &mut impl Write) { + while self.write_next(settings, out) {} } fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool { diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 5825e73bd..70e3325ad 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -95,6 +95,8 @@ static OPT_PARALLEL: &str = "parallel"; static OPT_FILES0_FROM: &str = "files0-from"; static OPT_BUF_SIZE: &str = "buffer-size"; static OPT_TMP_DIR: &str = "temporary-directory"; +static OPT_COMPRESS_PROG: &str = "compress-program"; +static OPT_BATCH_SIZE: &str = "batch-size"; static ARG_FILES: &str = "files"; @@ -155,6 +157,8 @@ pub struct GlobalSettings { zero_terminated: bool, buffer_size: usize, tmp_dir: PathBuf, + compress_prog: Option, + merge_batch_size: usize, } impl GlobalSettings { @@ -223,6 +227,8 @@ impl Default for GlobalSettings { zero_terminated: false, buffer_size: DEFAULT_BUF_SIZE, tmp_dir: PathBuf::new(), + compress_prog: None, + merge_batch_size: 16, } } } @@ -1076,6 +1082,19 @@ pub fn uumain(args: impl uucore::Args) -> i32 { .takes_value(true) .value_name("DIR"), ) + .arg( + Arg::with_name(OPT_COMPRESS_PROG) + .long(OPT_COMPRESS_PROG) + .help("compress temporary files with PROG, decompress with PROG -d") + .long_help("PROG has to take input from stdin and output to stdout") + .value_name("PROG") + ) + .arg( + Arg::with_name(OPT_BATCH_SIZE) + .long(OPT_BATCH_SIZE) + .help("Merge at most N_MERGE inputs at once.") + .value_name("N_MERGE") + ) .arg( Arg::with_name(OPT_FILES0_FROM) .long(OPT_FILES0_FROM) @@ -1165,6 +1184,14 @@ pub fn uumain(args: impl uucore::Args) -> i32 { .map(PathBuf::from) .unwrap_or_else(env::temp_dir); + settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from); + + if let Some(n_merge) = matches.value_of(OPT_BATCH_SIZE) { + settings.merge_batch_size = n_merge + .parse() + .unwrap_or_else(|_| crash!(2, "invalid --batch-size argument '{}'", n_merge)); + } + settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.merge = matches.is_present(OPT_MERGE); @@ -1240,7 +1267,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator>, settings: & fn exec(files: &[String], settings: &GlobalSettings) -> i32 { if settings.merge { - let mut file_merger = merge::merge(files, settings); + let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings); file_merger.write_all(settings); } else if settings.check { if files.len() > 1 { diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index 02636b027..75611abfc 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -792,3 +792,64 @@ fn test_nonexistent_file() { fn test_blanks() { test_helper("blanks", &["-b", "--ignore-blanks"]); } + +#[test] +fn sort_multiple() { + new_ucmd!() + .args(&["no_trailing_newline1.txt", "no_trailing_newline2.txt"]) + .succeeds() + .stdout_is("a\nb\nb\n"); +} + +#[test] +fn sort_empty_chunk() { + new_ucmd!() + .args(&["-S", "40B"]) + .pipe_in("a\na\n") + .succeeds() + .stdout_is("a\na\n"); +} + +#[test] +#[cfg(target_os = "linux")] +fn test_compress() { + new_ucmd!() + .args(&[ + "ext_sort.txt", + "-n", + "--compress-program", + "gzip", + "-S", + "10", + ]) + .succeeds() + .stdout_only_fixture("ext_sort.expected"); +} + +#[test] +fn test_compress_fail() { + new_ucmd!() + .args(&[ + "ext_sort.txt", + "-n", + "--compress-program", + "nonexistent-program", + "-S", + "10", + ]) + .fails() + .stderr_only("sort: couldn't execute compress program: errno 2"); +} + +#[test] +fn test_merge_batches() { + new_ucmd!() + .args(&[ + "ext_sort.txt", + "-n", + "-S", + "150B", + ]) + .succeeds() + .stdout_only_fixture("ext_sort.expected"); +} diff --git a/tests/fixtures/sort/no_trailing_newline1.txt b/tests/fixtures/sort/no_trailing_newline1.txt new file mode 100644 index 000000000..0a207c060 --- /dev/null +++ b/tests/fixtures/sort/no_trailing_newline1.txt @@ -0,0 +1,2 @@ +a +b \ No newline at end of file diff --git a/tests/fixtures/sort/no_trailing_newline2.txt b/tests/fixtures/sort/no_trailing_newline2.txt new file mode 100644 index 000000000..63d8dbd40 --- /dev/null +++ b/tests/fixtures/sort/no_trailing_newline2.txt @@ -0,0 +1 @@ +b \ No newline at end of file