From 8d213219c7c4120bf70c926d79fe517b2daf1b29 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sun, 6 Jun 2021 12:07:06 +0200 Subject: [PATCH] sort: implement --compress-program --- src/uu/sort/src/ext_sort.rs | 75 ++++++++++++++++++++++++++++++++----- src/uu/sort/src/merge.rs | 10 +++-- src/uu/sort/src/sort.rs | 14 ++++++- tests/by-util/test_sort.rs | 31 +++++++++++++++ 4 files changed, 115 insertions(+), 15 deletions(-) diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index d344df428..2d8513e9f 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -12,8 +12,12 @@ //! The buffers for the individual chunks are recycled. There are two buffers. use std::cmp::Ordering; +use std::fs::File; +use std::io::BufReader; use std::io::{BufWriter, Write}; use std::path::Path; +use std::process::Child; +use std::process::{Command, Stdio}; use std::{ fs::OpenOptions, io::Read, @@ -25,6 +29,7 @@ use itertools::Itertools; use tempfile::TempDir; +use crate::Line; use crate::{ chunks::{self, Chunk}, compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, @@ -63,10 +68,31 @@ pub fn ext_sort(files: &mut impl Iterator>, settings ); match read_result { ReadResult::WroteChunksToFile { chunks_written } => { - let files = (0..chunks_written) - .map(|chunk_num| tmp_dir.path().join(chunk_num.to_string())) - .collect::>(); - let mut merger = merge::merge(&files, settings); + let mut children = Vec::new(); + let files = (0..chunks_written).map(|chunk_num| { + let file_path = tmp_dir.path().join(chunk_num.to_string()); + let file = File::open(file_path).unwrap(); + if let Some(compress_prog) = &settings.compress_prog { + let mut command = Command::new(compress_prog); + command.stdin(file).stdout(Stdio::piped()).arg("-d"); + let mut child = crash_if_err!( + 2, + command.spawn().map_err(|err| format!( + "couldn't execute compress program: errno {}", + err.raw_os_error().unwrap() + )) + ); + let child_stdout = child.stdout.take().unwrap(); + children.push(child); + Box::new(BufReader::new(child_stdout)) as Box + } else { + Box::new(BufReader::new(file)) as Box + } + }); + let mut merger = merge::merge(files, settings); + for child in children { + assert_child_success(child, settings.compress_prog.as_ref().unwrap()); + } merger.write_all(settings); } ReadResult::SortedSingleChunk(chunk) => { @@ -178,6 +204,7 @@ fn reader_writer( write( &mut chunk, &tmp_dir.path().join(file_number.to_string()), + settings.compress_prog.as_deref(), separator, ); @@ -200,14 +227,42 @@ fn reader_writer( } /// Write the lines in `chunk` to `file`, separated by `separator`. -fn write(chunk: &mut Chunk, file: &Path, separator: u8) { +/// `compress_prog` is used to optionally compress file contents. +fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) { chunk.with_lines_mut(|lines| { // Write the lines to the file let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file)); - let mut writer = BufWriter::new(file); - for s in lines.iter() { - crash_if_err!(1, writer.write_all(s.line.as_bytes())); - crash_if_err!(1, writer.write_all(&[separator])); - } + if let Some(compress_prog) = compress_prog { + let mut command = Command::new(compress_prog); + command.stdin(Stdio::piped()).stdout(file); + let mut child = crash_if_err!( + 2, + command.spawn().map_err(|err| format!( + "couldn't execute compress program: errno {}", + err.raw_os_error().unwrap() + )) + ); + let mut writer = BufWriter::new(child.stdin.take().unwrap()); + write_lines(lines, &mut writer, separator); + writer.flush().unwrap(); + drop(writer); + assert_child_success(child, compress_prog); + } else { + let mut writer = BufWriter::new(file); + write_lines(lines, &mut writer, separator); + }; }); } + +fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) { + for s in lines { + crash_if_err!(1, writer.write_all(s.line.as_bytes())); + crash_if_err!(1, writer.write_all(&[separator])); + } +} + +fn assert_child_success(mut child: Child, program: &str) { + if !matches!(child.wait().map(|e| e.code()), Ok(Some(0)) | Ok(None)) { + crash!(2, "'{}' terminated abnormally", program) + } +} diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 696353829..b47c58c08 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -9,7 +9,6 @@ use std::{ cmp::Ordering, - ffi::OsStr, io::{Read, Write}, iter, rc::Rc, @@ -21,15 +20,18 @@ use compare::Compare; use crate::{ chunks::{self, Chunk}, - compare_by, open, GlobalSettings, + compare_by, GlobalSettings, }; // Merge already sorted files. -pub fn merge<'a>(files: &[impl AsRef], settings: &'a GlobalSettings) -> FileMerger<'a> { +pub fn merge>>( + files: F, + settings: &GlobalSettings, +) -> FileMerger { let (request_sender, request_receiver) = channel(); let mut reader_files = Vec::with_capacity(files.len()); let mut loaded_receivers = Vec::with_capacity(files.len()); - for (file_number, file) in files.iter().map(open).enumerate() { + for (file_number, file) in files.enumerate() { let (sender, receiver) = sync_channel(2); loaded_receivers.push(receiver); reader_files.push(ReaderFile { diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 5825e73bd..6cdf051c1 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -95,6 +95,7 @@ static OPT_PARALLEL: &str = "parallel"; static OPT_FILES0_FROM: &str = "files0-from"; static OPT_BUF_SIZE: &str = "buffer-size"; static OPT_TMP_DIR: &str = "temporary-directory"; +static OPT_COMPRESS_PROG: &str = "compress-program"; static ARG_FILES: &str = "files"; @@ -155,6 +156,7 @@ pub struct GlobalSettings { zero_terminated: bool, buffer_size: usize, tmp_dir: PathBuf, + compress_prog: Option, } impl GlobalSettings { @@ -223,6 +225,7 @@ impl Default for GlobalSettings { zero_terminated: false, buffer_size: DEFAULT_BUF_SIZE, tmp_dir: PathBuf::new(), + compress_prog: None, } } } @@ -1076,6 +1079,13 @@ pub fn uumain(args: impl uucore::Args) -> i32 { .takes_value(true) .value_name("DIR"), ) + .arg( + Arg::with_name(OPT_COMPRESS_PROG) + .long(OPT_COMPRESS_PROG) + .help("compress temporary files with PROG, decompress with PROG -d") + .long_help("PROG has to take input from stdin and output to stdout") + .value_name("PROG") + ) .arg( Arg::with_name(OPT_FILES0_FROM) .long(OPT_FILES0_FROM) @@ -1165,6 +1175,8 @@ pub fn uumain(args: impl uucore::Args) -> i32 { .map(PathBuf::from) .unwrap_or_else(env::temp_dir); + settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from); + settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.merge = matches.is_present(OPT_MERGE); @@ -1240,7 +1252,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator>, settings: & fn exec(files: &[String], settings: &GlobalSettings) -> i32 { if settings.merge { - let mut file_merger = merge::merge(files, settings); + let mut file_merger = merge::merge(files.iter().map(open), settings); file_merger.write_all(settings); } else if settings.check { if files.len() > 1 { diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index d100e2141..e731d5b1d 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -809,3 +809,34 @@ fn sort_empty_chunk() { .succeeds() .stdout_is("a\na\n"); } + +#[test] +#[cfg(target_os = "linux")] +fn test_compress() { + new_ucmd!() + .args(&[ + "ext_sort.txt", + "-n", + "--compress-program", + "gzip", + "-S", + "10", + ]) + .succeeds() + .stdout_only_fixture("ext_sort.expected"); +} + +#[test] +fn test_compress_fail() { + new_ucmd!() + .args(&[ + "ext_sort.txt", + "-n", + "--compress-program", + "nonexistent-program", + "-S", + "10", + ]) + .fails() + .stderr_only("sort: couldn't execute compress program: errno 2"); +}