diff --git a/src/uu/sort/src/check.rs b/src/uu/sort/src/check.rs index a8e5a0d9b..f53e4edb4 100644 --- a/src/uu/sort/src/check.rs +++ b/src/uu/sort/src/check.rs @@ -34,9 +34,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 { move || reader(file, recycled_receiver, loaded_sender, &settings) }); for _ in 0..2 { - recycled_sender - .send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new())) - .unwrap(); + let _ = recycled_sender.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new())); } let mut prev_chunk: Option = None; @@ -55,7 +53,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 { } return 1; } - recycled_sender.send(prev_chunk).ok(); + let _ = recycled_sender.send(prev_chunk); } for (a, b) in chunk.borrow_lines().iter().tuple_windows() { @@ -80,12 +78,11 @@ fn reader( sender: SyncSender, settings: &GlobalSettings, ) { - let mut sender = Some(sender); let mut carry_over = vec![]; for chunk in receiver.iter() { let (recycled_lines, recycled_buffer) = chunk.recycle(); - chunks::read( - &mut sender, + let should_continue = chunks::read( + &sender, recycled_buffer, None, &mut carry_over, @@ -98,6 +95,9 @@ fn reader( }, recycled_lines, settings, - ) + ); + if !should_continue { + break; + } } } diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 3d996e6d6..d452401df 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -52,17 +52,17 @@ impl Chunk { /// Read a chunk, parse lines and send them. /// -/// No empty chunk will be sent. If we reach the end of the input, sender_option -/// is set to None. If this function however does not set sender_option to None, -/// it is not guaranteed that there is still input left: If the input fits _exactly_ -/// into a buffer, we will only notice that there's nothing more to read at the next -/// invocation. +/// No empty chunk will be sent. If we reach the end of the input, `false` is returned. +/// However, if this function returns `true`, it is not guaranteed that there is still +/// input left: If the input fits _exactly_ into a buffer, we will only notice that there's +/// nothing more to read at the next invocation. In case there is no input left, nothing will +/// be sent. /// /// # Arguments /// /// (see also `read_to_chunk` for a more detailed documentation) /// -/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing. +/// * `sender`: The sender to send the lines to the sorter. /// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) /// * `max_buffer_size`: How big `buffer` can be. @@ -73,52 +73,47 @@ impl Chunk { /// * `lines`: The recycled vector to fill with lines. Must be empty. /// * `settings`: The global settings. #[allow(clippy::too_many_arguments)] -#[allow(clippy::borrowed_box)] -pub fn read( - sender_option: &mut Option>, +pub fn read( + sender: &SyncSender, mut buffer: Vec, max_buffer_size: Option, carry_over: &mut Vec, - file: &mut Box, - next_files: &mut impl Iterator>, + file: &mut T, + next_files: &mut impl Iterator, separator: u8, lines: Vec>, settings: &GlobalSettings, -) { +) -> bool { assert!(lines.is_empty()); - if let Some(sender) = sender_option { - if buffer.len() < carry_over.len() { - buffer.resize(carry_over.len() + 10 * 1024, 0); - } - buffer[..carry_over.len()].copy_from_slice(carry_over); - let (read, should_continue) = read_to_buffer( - file, - next_files, - &mut buffer, - max_buffer_size, - carry_over.len(), - separator, - ); - carry_over.clear(); - carry_over.extend_from_slice(&buffer[read..]); - - if read != 0 { - let payload = Chunk::new(buffer, |buf| { - let mut lines = unsafe { - // SAFETY: It is safe to transmute to a vector of lines with shorter lifetime, - // because it was only temporarily transmuted to a Vec> to make recycling possible. - std::mem::transmute::>, Vec>>(lines) - }; - let read = crash_if_err!(1, std::str::from_utf8(&buf[..read])); - parse_lines(read, &mut lines, separator, settings); - lines - }); - sender.send(payload).unwrap(); - } - if !should_continue { - *sender_option = None; - } + if buffer.len() < carry_over.len() { + buffer.resize(carry_over.len() + 10 * 1024, 0); } + buffer[..carry_over.len()].copy_from_slice(carry_over); + let (read, should_continue) = read_to_buffer( + file, + next_files, + &mut buffer, + max_buffer_size, + carry_over.len(), + separator, + ); + carry_over.clear(); + carry_over.extend_from_slice(&buffer[read..]); + + if read != 0 { + let payload = Chunk::new(buffer, |buf| { + let mut lines = unsafe { + // SAFETY: It is safe to transmute to a vector of lines with shorter lifetime, + // because it was only temporarily transmuted to a Vec> to make recycling possible. + std::mem::transmute::>, Vec>>(lines) + }; + let read = crash_if_err!(1, std::str::from_utf8(&buf[..read])); + parse_lines(read, &mut lines, separator, settings); + lines + }); + sender.send(payload).unwrap(); + } + should_continue } /// Split `read` into `Line`s, and add them to `lines`. @@ -165,10 +160,9 @@ fn parse_lines<'a>( /// The remaining bytes must be copied to the start of the buffer for the next invocation, /// if another invocation is necessary, which is determined by the other return value. /// * Whether this function should be called again. -#[allow(clippy::borrowed_box)] -fn read_to_buffer( - file: &mut Box, - next_files: &mut impl Iterator>, +fn read_to_buffer( + file: &mut T, + next_files: &mut impl Iterator, buffer: &mut Vec, max_buffer_size: Option, start_offset: usize, diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index c439adcdc..022c57741 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -12,14 +12,10 @@ //! The buffers for the individual chunks are recycled. There are two buffers. use std::cmp::Ordering; -use std::fs::File; -use std::io::BufReader; -use std::io::{BufWriter, Write}; +use std::io::Write; use std::path::Path; -use std::process::Child; -use std::process::{Command, Stdio}; +use std::path::PathBuf; use std::{ - fs::OpenOptions, io::Read, sync::mpsc::{Receiver, SyncSender}, thread, @@ -27,72 +23,78 @@ use std::{ use itertools::Itertools; -use tempfile::TempDir; - +use crate::merge::ClosedTmpFile; +use crate::merge::WriteableCompressedTmpFile; +use crate::merge::WriteablePlainTmpFile; +use crate::merge::WriteableTmpFile; use crate::Line; use crate::{ chunks::{self, Chunk}, compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, }; +use tempfile::TempDir; const START_BUFFER_SIZE: usize = 8_000; /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. pub fn ext_sort(files: &mut impl Iterator>, settings: &GlobalSettings) { - let tmp_dir = crash_if_err!( - 1, - tempfile::Builder::new() - .prefix("uutils_sort") - .tempdir_in(&settings.tmp_dir) - ); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); thread::spawn({ let settings = settings.clone(); move || sorter(recycled_receiver, sorted_sender, settings) }); - let read_result = reader_writer( + if settings.compress_prog.is_some() { + reader_writer::<_, WriteableCompressedTmpFile>( + files, + settings, + sorted_receiver, + recycled_sender, + ); + } else { + reader_writer::<_, WriteablePlainTmpFile>( + files, + settings, + sorted_receiver, + recycled_sender, + ); + } +} + +fn reader_writer>, Tmp: WriteableTmpFile + 'static>( + files: F, + settings: &GlobalSettings, + receiver: Receiver, + sender: SyncSender, +) { + let separator = if settings.zero_terminated { + b'\0' + } else { + b'\n' + }; + + // Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly + // around settings.buffer_size as a whole. + let buffer_size = settings.buffer_size / 10; + let read_result: ReadResult = read_write_loop( files, - &tmp_dir, - if settings.zero_terminated { - b'\0' - } else { - b'\n' - }, + &settings.tmp_dir, + separator, // Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly // around settings.buffer_size as a whole. - settings.buffer_size / 10, - settings.clone(), - sorted_receiver, - recycled_sender, + buffer_size, + settings, + receiver, + sender, ); match read_result { - ReadResult::WroteChunksToFile { chunks_written } => { - let mut children = Vec::new(); - let files = (0..chunks_written).map(|chunk_num| { - let file_path = tmp_dir.path().join(chunk_num.to_string()); - let file = File::open(file_path).unwrap(); - if let Some(compress_prog) = &settings.compress_prog { - let mut command = Command::new(compress_prog); - command.stdin(file).stdout(Stdio::piped()).arg("-d"); - let mut child = crash_if_err!( - 2, - command.spawn().map_err(|err| format!( - "couldn't execute compress program: errno {}", - err.raw_os_error().unwrap() - )) - ); - let child_stdout = child.stdout.take().unwrap(); - children.push(child); - Box::new(BufReader::new(child_stdout)) as Box - } else { - Box::new(BufReader::new(file)) as Box - } - }); - let mut merger = merge::merge_with_file_limit(files, settings); - for child in children { - assert_child_success(child, settings.compress_prog.as_ref().unwrap()); - } + ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => { + let tmp_dir_size = tmp_files.len(); + let mut merger = merge::merge_with_file_limit::<_, _, Tmp>( + tmp_files.into_iter().map(|c| c.reopen()), + settings, + Some((tmp_dir, tmp_dir_size)), + ); merger.write_all(settings); } ReadResult::SortedSingleChunk(chunk) => { @@ -122,7 +124,7 @@ fn sorter(receiver: Receiver, sender: SyncSender, settings: Global } /// Describes how we read the chunks from the input. -enum ReadResult { +enum ReadResult { /// The input was empty. Nothing was read. EmptyInput, /// The input fits into a single Chunk, which was kept in memory. @@ -131,33 +133,27 @@ enum ReadResult { SortedTwoChunks([Chunk; 2]), /// The input was read into multiple chunks, which were written to auxiliary files. WroteChunksToFile { - /// The number of chunks written to auxiliary files. - chunks_written: usize, + tmp_files: Vec, + tmp_dir: TempDir, }, } - /// The function that is executed on the reader/writer thread. -/// -/// # Returns -/// * The number of chunks read. -fn reader_writer( +fn read_write_loop( mut files: impl Iterator>, - tmp_dir: &TempDir, + tmp_dir_parent: &Path, separator: u8, buffer_size: usize, - settings: GlobalSettings, + settings: &GlobalSettings, receiver: Receiver, sender: SyncSender, -) -> ReadResult { - let mut sender_option = Some(sender); - +) -> ReadResult { let mut file = files.next().unwrap(); let mut carry_over = vec![]; // kick things off with two reads for _ in 0..2 { - chunks::read( - &mut sender_option, + let should_continue = chunks::read( + &sender, vec![ 0; if START_BUFFER_SIZE < buffer_size { @@ -172,9 +168,11 @@ fn reader_writer( &mut files, separator, Vec::new(), - &settings, + settings, ); - if sender_option.is_none() { + + if !should_continue { + drop(sender); // We have already read the whole input. Since we are in our first two reads, // this means that we can fit the whole input into memory. Bypass writing below and // handle this case in a more straightforward way. @@ -190,68 +188,69 @@ fn reader_writer( } } + let tmp_dir = crash_if_err!( + 1, + tempfile::Builder::new() + .prefix("uutils_sort") + .tempdir_in(tmp_dir_parent) + ); + + let mut sender_option = Some(sender); let mut file_number = 0; + let mut tmp_files = vec![]; loop { let mut chunk = match receiver.recv() { Ok(it) => it, _ => { - return ReadResult::WroteChunksToFile { - chunks_written: file_number, - } + return ReadResult::WroteChunksToFile { tmp_files, tmp_dir }; } }; - write( + let tmp_file = write::( &mut chunk, - &tmp_dir.path().join(file_number.to_string()), + tmp_dir.path().join(file_number.to_string()), settings.compress_prog.as_deref(), separator, ); + tmp_files.push(tmp_file); file_number += 1; let (recycled_lines, recycled_buffer) = chunk.recycle(); - chunks::read( - &mut sender_option, - recycled_buffer, - None, - &mut carry_over, - &mut file, - &mut files, - separator, - recycled_lines, - &settings, - ); + if let Some(sender) = &sender_option { + let should_continue = chunks::read( + sender, + recycled_buffer, + None, + &mut carry_over, + &mut file, + &mut files, + separator, + recycled_lines, + settings, + ); + if !should_continue { + sender_option = None; + } + } } } /// Write the lines in `chunk` to `file`, separated by `separator`. /// `compress_prog` is used to optionally compress file contents. -fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) { +fn write( + chunk: &mut Chunk, + file: PathBuf, + compress_prog: Option<&str>, + separator: u8, +) -> I::Closed { chunk.with_lines_mut(|lines| { // Write the lines to the file - let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file)); - if let Some(compress_prog) = compress_prog { - let mut command = Command::new(compress_prog); - command.stdin(Stdio::piped()).stdout(file); - let mut child = crash_if_err!( - 2, - command.spawn().map_err(|err| format!( - "couldn't execute compress program: errno {}", - err.raw_os_error().unwrap() - )) - ); - let mut writer = BufWriter::new(child.stdin.take().unwrap()); - write_lines(lines, &mut writer, separator); - writer.flush().unwrap(); - drop(writer); - assert_child_success(child, compress_prog); - } else { - let mut writer = BufWriter::new(file); - write_lines(lines, &mut writer, separator); - }; - }); + let mut tmp_file = I::create(file, compress_prog); + write_lines(lines, tmp_file.as_write(), separator); + tmp_file.finished_writing() + }) } fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) { @@ -260,12 +259,3 @@ fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) crash_if_err!(1, writer.write_all(&[separator])); } } - -fn assert_child_success(mut child: Child, program: &str) { - if !matches!( - child.wait().map(|e| e.code()), - Ok(Some(0)) | Ok(None) | Err(_) - ) { - crash!(2, "'{}' terminated abnormally", program) - } -} diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 478b454b6..173faaffc 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -9,9 +9,11 @@ use std::{ cmp::Ordering, - fs::File, + fs::{self, File}, io::{BufWriter, Read, Write}, iter, + path::PathBuf, + process::{Child, ChildStdin, ChildStdout, Command, Stdio}, rc::Rc, sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, thread, @@ -19,61 +21,94 @@ use std::{ use compare::Compare; use itertools::Itertools; +use tempfile::TempDir; use crate::{ chunks::{self, Chunk}, compare_by, GlobalSettings, }; -// Merge already sorted files. -pub fn merge_with_file_limit>>( - files: F, +/// Merge pre-sorted `Box`s. +/// +/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used. +/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it. +pub fn merge>>( + files: Files, settings: &GlobalSettings, +) -> FileMerger { + if settings.compress_prog.is_none() { + merge_with_file_limit::<_, _, WriteablePlainTmpFile>( + files.map(|file| PlainMergeInput { inner: file }), + settings, + None, + ) + } else { + merge_with_file_limit::<_, _, WriteableCompressedTmpFile>( + files.map(|file| PlainMergeInput { inner: file }), + settings, + None, + ) + } +} + +// Merge already sorted `MergeInput`s. +pub fn merge_with_file_limit< + M: MergeInput + 'static, + F: ExactSizeIterator, + Tmp: WriteableTmpFile + 'static, +>( + files: F, + settings: &GlobalSettings, + tmp_dir: Option<(TempDir, usize)>, ) -> FileMerger { if files.len() > settings.merge_batch_size { - let tmp_dir = tempfile::Builder::new() - .prefix("uutils_sort") - .tempdir_in(&settings.tmp_dir) - .unwrap(); - let mut batch_number = 0; + // If we did not get a tmp_dir, create one. + let (tmp_dir, mut tmp_dir_size) = tmp_dir.unwrap_or_else(|| { + ( + tempfile::Builder::new() + .prefix("uutils_sort") + .tempdir_in(&settings.tmp_dir) + .unwrap(), + 0, + ) + }); let mut remaining_files = files.len(); let batches = files.chunks(settings.merge_batch_size); let mut batches = batches.into_iter(); - while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 { + let mut temporary_files = vec![]; + while remaining_files != 0 { + // Work around the fact that `Chunks` is not an `ExactSizeIterator`. remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); let mut merger = merge_without_limit(batches.next().unwrap(), settings); - let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap(); - merger.write_all_to(settings, &mut BufWriter::new(tmp_file)); - batch_number += 1; - } - let batch_files = (0..batch_number).map(|n| { - Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap()) - as Box - }); - if batch_number > settings.merge_batch_size { - assert!(batches.next().is_none()); - merge_with_file_limit( - Box::new(batch_files) as Box>>, - settings, - ) - } else { - let final_batch = batches.next(); - assert!(batches.next().is_none()); - merge_without_limit( - batch_files.chain(final_batch.into_iter().flatten()), - settings, - ) + let mut tmp_file = Tmp::create( + tmp_dir.path().join(tmp_dir_size.to_string()), + settings.compress_prog.as_deref(), + ); + tmp_dir_size += 1; + merger.write_all_to(settings, tmp_file.as_write()); + temporary_files.push(tmp_file.finished_writing()); } + assert!(batches.next().is_none()); + merge_with_file_limit::<_, _, Tmp>( + temporary_files + .into_iter() + .map(Box::new(|c: Tmp::Closed| c.reopen()) + as Box< + dyn FnMut(Tmp::Closed) -> ::Reopened, + >), + settings, + Some((tmp_dir, tmp_dir_size)), + ) } else { merge_without_limit(files, settings) } } -/// Merge files without limiting how many files are concurrently open +/// Merge files without limiting how many files are concurrently open. /// /// It is the responsibility of the caller to ensure that `files` yields only /// as many files as we are allowed to open concurrently. -fn merge_without_limit>>( +fn merge_without_limit>( files: F, settings: &GlobalSettings, ) -> FileMerger { @@ -83,16 +118,18 @@ fn merge_without_limit>>( for (file_number, file) in files.enumerate() { let (sender, receiver) = sync_channel(2); loaded_receivers.push(receiver); - reader_files.push(ReaderFile { + reader_files.push(Some(ReaderFile { file, - sender: Some(sender), + sender, carry_over: vec![], - }); + })); + // Send the initial chunk to trigger a read for each file request_sender .send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) .unwrap(); } + // Send the second chunk for each file for file_number in 0..reader_files.len() { request_sender .send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) @@ -136,37 +173,45 @@ fn merge_without_limit>>( } } /// The struct on the reader thread representing an input file -struct ReaderFile { - file: Box, - sender: Option>, +struct ReaderFile { + file: M, + sender: SyncSender, carry_over: Vec, } /// The function running on the reader thread. fn reader( recycled_receiver: Receiver<(usize, Chunk)>, - files: &mut [ReaderFile], + files: &mut [Option>], settings: &GlobalSettings, separator: u8, ) { for (file_idx, chunk) in recycled_receiver.iter() { let (recycled_lines, recycled_buffer) = chunk.recycle(); - let ReaderFile { + if let Some(ReaderFile { file, sender, carry_over, - } = &mut files[file_idx]; - chunks::read( - sender, - recycled_buffer, - None, - carry_over, - file, - &mut iter::empty(), - separator, - recycled_lines, - settings, - ); + }) = &mut files[file_idx] + { + let should_continue = chunks::read( + sender, + recycled_buffer, + None, + carry_over, + file.as_read(), + &mut iter::empty(), + separator, + recycled_lines, + settings, + ); + if !should_continue { + // Remove the file from the list by replacing it with `None`. + let ReaderFile { file, .. } = files[file_idx].take().unwrap(); + // Depending on the kind of the `MergeInput`, this may delete the file: + file.finished_reading(); + } + } } } /// The struct on the main thread representing an input file @@ -241,11 +286,14 @@ impl<'a> FileMerger<'a> { self.heap.pop(); } } else { + // This will cause the comparison to use a different line and the heap to readjust. self.heap.peek_mut().unwrap().line_idx += 1; } if let Some(prev) = prev { if let Ok(prev_chunk) = Rc::try_unwrap(prev.chunk) { + // If nothing is referencing the previous chunk anymore, this means that the previous line + // was the last line of the chunk. We can recycle the chunk. self.request_sender .send((prev.file_number, prev_chunk)) .ok(); @@ -273,7 +321,195 @@ impl<'a> Compare for FileComparator<'a> { // as lines from a file with a lower number are to be considered "earlier". cmp = a.file_number.cmp(&b.file_number); } - // Our BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering. + // BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering. cmp.reverse() } } + +// Wait for the child to exit and check its exit code. +fn assert_child_success(mut child: Child, program: &str) { + if !matches!( + child.wait().map(|e| e.code()), + Ok(Some(0)) | Ok(None) | Err(_) + ) { + crash!(2, "'{}' terminated abnormally", program) + } +} + +/// A temporary file that can be written to. +pub trait WriteableTmpFile { + type Closed: ClosedTmpFile; + type InnerWrite: Write; + fn create(path: PathBuf, compress_prog: Option<&str>) -> Self; + /// Closes the temporary file. + fn finished_writing(self) -> Self::Closed; + fn as_write(&mut self) -> &mut Self::InnerWrite; +} +/// A temporary file that is (temporarily) closed, but can be reopened. +pub trait ClosedTmpFile { + type Reopened: MergeInput; + /// Reopens the temporary file. + fn reopen(self) -> Self::Reopened; +} +/// A pre-sorted input for merging. +pub trait MergeInput: Send { + type InnerRead: Read; + /// Cleans this `MergeInput` up. + /// Implementations may delete the backing file. + fn finished_reading(self); + fn as_read(&mut self) -> &mut Self::InnerRead; +} + +pub struct WriteablePlainTmpFile { + path: PathBuf, + file: BufWriter, +} +pub struct ClosedPlainTmpFile { + path: PathBuf, +} +pub struct PlainTmpMergeInput { + path: PathBuf, + file: File, +} +impl WriteableTmpFile for WriteablePlainTmpFile { + type Closed = ClosedPlainTmpFile; + type InnerWrite = BufWriter; + + fn create(path: PathBuf, _: Option<&str>) -> Self { + WriteablePlainTmpFile { + file: BufWriter::new(File::create(&path).unwrap()), + path, + } + } + + fn finished_writing(self) -> Self::Closed { + ClosedPlainTmpFile { path: self.path } + } + + fn as_write(&mut self) -> &mut Self::InnerWrite { + &mut self.file + } +} +impl ClosedTmpFile for ClosedPlainTmpFile { + type Reopened = PlainTmpMergeInput; + fn reopen(self) -> Self::Reopened { + PlainTmpMergeInput { + file: File::open(&self.path).unwrap(), + path: self.path, + } + } +} +impl MergeInput for PlainTmpMergeInput { + type InnerRead = File; + + fn finished_reading(self) { + fs::remove_file(self.path).ok(); + } + + fn as_read(&mut self) -> &mut Self::InnerRead { + &mut self.file + } +} + +pub struct WriteableCompressedTmpFile { + path: PathBuf, + compress_prog: String, + child: Child, + child_stdin: BufWriter, +} +pub struct ClosedCompressedTmpFile { + path: PathBuf, + compress_prog: String, +} +pub struct CompressedTmpMergeInput { + path: PathBuf, + compress_prog: String, + child: Child, + child_stdout: ChildStdout, +} +impl WriteableTmpFile for WriteableCompressedTmpFile { + type Closed = ClosedCompressedTmpFile; + type InnerWrite = BufWriter; + + fn create(path: PathBuf, compress_prog: Option<&str>) -> Self { + let compress_prog = compress_prog.unwrap(); + let mut command = Command::new(compress_prog); + command + .stdin(Stdio::piped()) + .stdout(File::create(&path).unwrap()); + let mut child = crash_if_err!( + 2, + command.spawn().map_err(|err| format!( + "couldn't execute compress program: errno {}", + err.raw_os_error().unwrap() + )) + ); + let child_stdin = child.stdin.take().unwrap(); + WriteableCompressedTmpFile { + path, + compress_prog: compress_prog.to_owned(), + child, + child_stdin: BufWriter::new(child_stdin), + } + } + + fn finished_writing(self) -> Self::Closed { + drop(self.child_stdin); + assert_child_success(self.child, &self.compress_prog); + ClosedCompressedTmpFile { + path: self.path, + compress_prog: self.compress_prog, + } + } + + fn as_write(&mut self) -> &mut Self::InnerWrite { + &mut self.child_stdin + } +} +impl ClosedTmpFile for ClosedCompressedTmpFile { + type Reopened = CompressedTmpMergeInput; + + fn reopen(self) -> Self::Reopened { + let mut command = Command::new(&self.compress_prog); + let file = File::open(&self.path).unwrap(); + command.stdin(file).stdout(Stdio::piped()).arg("-d"); + let mut child = crash_if_err!( + 2, + command.spawn().map_err(|err| format!( + "couldn't execute compress program: errno {}", + err.raw_os_error().unwrap() + )) + ); + let child_stdout = child.stdout.take().unwrap(); + CompressedTmpMergeInput { + path: self.path, + compress_prog: self.compress_prog, + child, + child_stdout, + } + } +} +impl MergeInput for CompressedTmpMergeInput { + type InnerRead = ChildStdout; + + fn finished_reading(self) { + drop(self.child_stdout); + assert_child_success(self.child, &self.compress_prog); + fs::remove_file(self.path).ok(); + } + + fn as_read(&mut self) -> &mut Self::InnerRead { + &mut self.child_stdout + } +} + +pub struct PlainMergeInput { + inner: R, +} +impl MergeInput for PlainMergeInput { + type InnerRead = R; + fn finished_reading(self) {} + fn as_read(&mut self) -> &mut Self::InnerRead { + &mut self.inner + } +} diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index bc5048e11..4e865b208 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -236,7 +236,7 @@ impl Default for GlobalSettings { buffer_size: DEFAULT_BUF_SIZE, tmp_dir: PathBuf::new(), compress_prog: None, - merge_batch_size: 16, + merge_batch_size: 32, } } } @@ -1313,7 +1313,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator>, settings: & fn exec(files: &[String], settings: &GlobalSettings) -> i32 { if settings.merge { - let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings); + let mut file_merger = merge::merge(files.iter().map(open), settings); file_merger.write_all(settings); } else if settings.check { if files.len() > 1 { @@ -1516,8 +1516,6 @@ fn get_hash(t: &T) -> u64 { } fn random_shuffle(a: &str, b: &str, salt: &str) -> Ordering { - #![allow(clippy::comparison_chain)] - let da = get_hash(&[a, salt].concat()); let db = get_hash(&[b, salt].concat()); diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index d0af7a9c9..02d9fe92d 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -897,3 +897,19 @@ fn test_merge_batches() { .succeeds() .stdout_only_fixture("ext_sort.expected"); } + +#[test] +fn test_merge_batch_size() { + new_ucmd!() + .arg("--batch-size=2") + .arg("-m") + .arg("--unique") + .arg("merge_ints_interleaved_1.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_1.txt") + .succeeds() + .stdout_only_fixture("merge_ints_interleaved.expected"); +}