From a4709c805c8468cc0c4f3d45d98d7548a8142148 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sun, 1 Aug 2021 16:51:56 +0200 Subject: [PATCH] sort: use UResult --- src/uu/sort/src/check.rs | 42 +++--- src/uu/sort/src/chunks.rs | 32 +++-- src/uu/sort/src/ext_sort.rs | 64 +++++---- src/uu/sort/src/merge.rs | 186 +++++++++++++----------- src/uu/sort/src/sort.rs | 273 +++++++++++++++++++++++++----------- 5 files changed, 370 insertions(+), 227 deletions(-) diff --git a/src/uu/sort/src/check.rs b/src/uu/sort/src/check.rs index d82565c3d..5be752be0 100644 --- a/src/uu/sort/src/check.rs +++ b/src/uu/sort/src/check.rs @@ -9,7 +9,7 @@ use crate::{ chunks::{self, Chunk, RecycledChunk}, - compare_by, open, GlobalSettings, + compare_by, open, GlobalSettings, SortError, }; use itertools::Itertools; use std::{ @@ -20,13 +20,14 @@ use std::{ sync::mpsc::{sync_channel, Receiver, SyncSender}, thread, }; +use uucore::error::UResult; /// Check if the file at `path` is ordered. /// /// # Returns /// /// The code we should exit with. -pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 { +pub fn check(path: &OsStr, settings: &GlobalSettings) -> UResult<()> { let max_allowed_cmp = if settings.unique { // If `unique` is enabled, the previous line must compare _less_ to the next one. Ordering::Less @@ -34,7 +35,7 @@ pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 { // Otherwise, the line previous line must compare _less or equal_ to the next one. Ordering::Equal }; - let file = open(path); + let file = open(path)?; let (recycled_sender, recycled_receiver) = sync_channel(2); let (loaded_sender, loaded_receiver) = sync_channel(2); thread::spawn({ @@ -69,15 +70,13 @@ pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 { chunk.line_data(), ) > max_allowed_cmp { - if !settings.check_silent { - eprintln!( - "sort: {}:{}: disorder: {}", - path.to_string_lossy(), - line_idx, - new_first.line - ); + return Err(SortError::Disorder { + file: path.to_owned(), + line_number: line_idx, + line: new_first.line.to_owned(), + silent: settings.check_silent, } - return 1; + .into()); } let _ = recycled_sender.send(prev_chunk.recycle()); } @@ -85,21 +84,19 @@ pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 { for (a, b) in chunk.lines().iter().tuple_windows() { line_idx += 1; if compare_by(a, b, settings, chunk.line_data(), chunk.line_data()) > max_allowed_cmp { - if !settings.check_silent { - eprintln!( - "sort: {}:{}: disorder: {}", - path.to_string_lossy(), - line_idx, - b.line - ); + return Err(SortError::Disorder { + file: path.to_owned(), + line_number: line_idx, + line: b.line.to_owned(), + silent: settings.check_silent, } - return 1; + .into()); } } prev_chunk = Some(chunk); } - 0 + Ok(()) } /// The function running on the reader thread. @@ -108,7 +105,7 @@ fn reader( receiver: Receiver, sender: SyncSender, settings: &GlobalSettings, -) { +) -> UResult<()> { let mut carry_over = vec![]; for recycled_chunk in receiver.iter() { let should_continue = chunks::read( @@ -124,9 +121,10 @@ fn reader( b'\n' }, settings, - ); + )?; if !should_continue { break; } } + Ok(()) } diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 5ab98392d..80d6060d4 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -14,8 +14,9 @@ use std::{ use memchr::memchr_iter; use ouroboros::self_referencing; +use uucore::error::{UResult, USimpleError}; -use crate::{numeric_str_cmp::NumInfo, GeneralF64ParseResult, GlobalSettings, Line}; +use crate::{numeric_str_cmp::NumInfo, GeneralF64ParseResult, GlobalSettings, Line, SortError}; /// The chunk that is passed around between threads. /// `lines` consist of slices into `buffer`. @@ -137,10 +138,10 @@ pub fn read( max_buffer_size: Option, carry_over: &mut Vec, file: &mut T, - next_files: &mut impl Iterator, + next_files: &mut impl Iterator>, separator: u8, settings: &GlobalSettings, -) -> bool { +) -> UResult { let RecycledChunk { lines, selections, @@ -159,12 +160,12 @@ pub fn read( max_buffer_size, carry_over.len(), separator, - ); + )?; carry_over.clear(); carry_over.extend_from_slice(&buffer[read..]); if read != 0 { - let payload = Chunk::new(buffer, |buffer| { + let payload: UResult = Chunk::try_new(buffer, |buffer| { let selections = unsafe { // SAFETY: It is safe to transmute to an empty vector of selections with shorter lifetime. // It was only temporarily transmuted to a Vec> to make recycling possible. @@ -175,18 +176,19 @@ pub fn read( // because it was only temporarily transmuted to a Vec> to make recycling possible. std::mem::transmute::>, Vec>>(lines) }; - let read = crash_if_err!(2, std::str::from_utf8(&buffer[..read])); + let read = std::str::from_utf8(&buffer[..read]) + .map_err(|error| SortError::Uft8Error { error })?; let mut line_data = LineData { selections, num_infos, parsed_floats, }; parse_lines(read, &mut lines, &mut line_data, separator, settings); - ChunkContents { lines, line_data } + Ok(ChunkContents { lines, line_data }) }); - sender.send(payload).unwrap(); + sender.send(payload?).unwrap(); } - should_continue + Ok(should_continue) } /// Split `read` into `Line`s, and add them to `lines`. @@ -242,12 +244,12 @@ fn parse_lines<'a>( /// * Whether this function should be called again. fn read_to_buffer( file: &mut T, - next_files: &mut impl Iterator, + next_files: &mut impl Iterator>, buffer: &mut Vec, max_buffer_size: Option, start_offset: usize, separator: u8, -) -> (usize, bool) { +) -> UResult<(usize, bool)> { let mut read_target = &mut buffer[start_offset..]; let mut last_file_target_size = read_target.len(); loop { @@ -274,7 +276,7 @@ fn read_to_buffer( // We read enough lines. let end = last_line_end.unwrap(); // We want to include the separator here, because it shouldn't be carried over. - return (end + 1, true); + return Ok((end + 1, true)); } else { // We need to read more lines let len = buffer.len(); @@ -299,11 +301,11 @@ fn read_to_buffer( if let Some(next_file) = next_files.next() { // There is another file. last_file_target_size = leftover_len; - *file = next_file; + *file = next_file?; } else { // This was the last file. let read_len = buffer.len() - leftover_len; - return (read_len, false); + return Ok((read_len, false)); } } } @@ -313,7 +315,7 @@ fn read_to_buffer( Err(e) if e.kind() == ErrorKind::Interrupted => { // retry } - Err(e) => crash!(2, "{}", e), + Err(e) => return Err(USimpleError::new(2, e.to_string())), } } } diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 816bf1e1d..8ff5665fd 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -22,6 +22,7 @@ use std::{ }; use itertools::Itertools; +use uucore::error::UResult; use crate::chunks::RecycledChunk; use crate::merge::ClosedTmpFile; @@ -29,6 +30,7 @@ use crate::merge::WriteableCompressedTmpFile; use crate::merge::WriteablePlainTmpFile; use crate::merge::WriteableTmpFile; use crate::Output; +use crate::SortError; use crate::{ chunks::{self, Chunk}, compare_by, merge, sort_by, GlobalSettings, @@ -40,10 +42,10 @@ const START_BUFFER_SIZE: usize = 8_000; /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. pub fn ext_sort( - files: &mut impl Iterator>, + files: &mut impl Iterator>>, settings: &GlobalSettings, output: Output, -) { +) -> UResult<()> { let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); thread::spawn({ @@ -57,7 +59,7 @@ pub fn ext_sort( sorted_receiver, recycled_sender, output, - ); + ) } else { reader_writer::<_, WriteablePlainTmpFile>( files, @@ -65,17 +67,20 @@ pub fn ext_sort( sorted_receiver, recycled_sender, output, - ); + ) } } -fn reader_writer>, Tmp: WriteableTmpFile + 'static>( +fn reader_writer< + F: Iterator>>, + Tmp: WriteableTmpFile + 'static, +>( files: F, settings: &GlobalSettings, receiver: Receiver, sender: SyncSender, output: Output, -) { +) -> UResult<()> { let separator = if settings.zero_terminated { b'\0' } else { @@ -93,16 +98,16 @@ fn reader_writer>, Tmp: WriteableTmpFile settings, receiver, sender, - ); + )?; match read_result { ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => { let tmp_dir_size = tmp_files.len(); - let mut merger = merge::merge_with_file_limit::<_, _, Tmp>( + let merger = merge::merge_with_file_limit::<_, _, Tmp>( tmp_files.into_iter().map(|c| c.reopen()), settings, Some((tmp_dir, tmp_dir_size)), - ); - merger.write_all(settings, output); + )?; + merger.write_all(settings, output)?; } ReadResult::SortedSingleChunk(chunk) => { if settings.unique { @@ -145,6 +150,7 @@ fn reader_writer>, Tmp: WriteableTmpFile // don't output anything } } + Ok(()) } /// The function that is executed on the sorter thread. @@ -153,7 +159,11 @@ fn sorter(receiver: Receiver, sender: SyncSender, settings: Global payload.with_contents_mut(|contents| { sort_by(&mut contents.lines, &settings, &contents.line_data) }); - sender.send(payload).unwrap(); + if sender.send(payload).is_err() { + // The receiver has gone away, likely because the other thread hit an error. + // We stop silently because the actual error is printed by the other thread. + return; + } } } @@ -173,15 +183,15 @@ enum ReadResult { } /// The function that is executed on the reader/writer thread. fn read_write_loop( - mut files: impl Iterator>, + mut files: impl Iterator>>, tmp_dir_parent: &Path, separator: u8, buffer_size: usize, settings: &GlobalSettings, receiver: Receiver, sender: SyncSender, -) -> ReadResult { - let mut file = files.next().unwrap(); +) -> UResult> { + let mut file = files.next().unwrap()?; let mut carry_over = vec![]; // kick things off with two reads @@ -199,14 +209,14 @@ fn read_write_loop( &mut files, separator, settings, - ); + )?; if !should_continue { drop(sender); // We have already read the whole input. Since we are in our first two reads, // this means that we can fit the whole input into memory. Bypass writing below and // handle this case in a more straightforward way. - return if let Ok(first_chunk) = receiver.recv() { + return Ok(if let Ok(first_chunk) = receiver.recv() { if let Ok(second_chunk) = receiver.recv() { ReadResult::SortedTwoChunks([first_chunk, second_chunk]) } else { @@ -214,16 +224,14 @@ fn read_write_loop( } } else { ReadResult::EmptyInput - }; + }); } } - let tmp_dir = crash_if_err!( - 2, - tempfile::Builder::new() - .prefix("uutils_sort") - .tempdir_in(tmp_dir_parent) - ); + let tmp_dir = tempfile::Builder::new() + .prefix("uutils_sort") + .tempdir_in(tmp_dir_parent) + .map_err(|_| SortError::TmpDirCreationFailed)?; let mut sender_option = Some(sender); let mut file_number = 0; @@ -232,7 +240,7 @@ fn read_write_loop( let mut chunk = match receiver.recv() { Ok(it) => it, _ => { - return ReadResult::WroteChunksToFile { tmp_files, tmp_dir }; + return Ok(ReadResult::WroteChunksToFile { tmp_files, tmp_dir }); } }; @@ -241,7 +249,7 @@ fn read_write_loop( tmp_dir.path().join(file_number.to_string()), settings.compress_prog.as_deref(), separator, - ); + )?; tmp_files.push(tmp_file); file_number += 1; @@ -258,7 +266,7 @@ fn read_write_loop( &mut files, separator, settings, - ); + )?; if !should_continue { sender_option = None; } @@ -273,8 +281,8 @@ fn write( file: PathBuf, compress_prog: Option<&str>, separator: u8, -) -> I::Closed { - let mut tmp_file = I::create(file, compress_prog); +) -> UResult { + let mut tmp_file = I::create(file, compress_prog)?; write_lines(chunk.lines(), tmp_file.as_write(), separator); tmp_file.finished_writing() } diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index a5ac9411b..fad966f64 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -17,16 +17,17 @@ use std::{ process::{Child, ChildStdin, ChildStdout, Command, Stdio}, rc::Rc, sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, - thread, + thread::{self, JoinHandle}, }; use compare::Compare; use itertools::Itertools; use tempfile::TempDir; +use uucore::error::UResult; use crate::{ chunks::{self, Chunk, RecycledChunk}, - compare_by, open, GlobalSettings, Output, + compare_by, open, GlobalSettings, Output, SortError, }; /// If the output file occurs in the input files as well, copy the contents of the output file @@ -35,7 +36,7 @@ fn replace_output_file_in_input_files( files: &mut [OsString], settings: &GlobalSettings, output: Option<&str>, -) -> Option<(TempDir, usize)> { +) -> UResult> { let mut copy: Option<(TempDir, PathBuf)> = None; if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) { for file in files { @@ -47,9 +48,10 @@ fn replace_output_file_in_input_files( let tmp_dir = tempfile::Builder::new() .prefix("uutils_sort") .tempdir_in(&settings.tmp_dir) - .unwrap(); + .map_err(|_| SortError::TmpDirCreationFailed)?; let copy_path = tmp_dir.path().join("0"); - std::fs::copy(file_path, ©_path).unwrap(); + std::fs::copy(file_path, ©_path) + .map_err(|error| SortError::OpenTmpFileFailed { error })?; *file = copy_path.clone().into_os_string(); copy = Some((tmp_dir, copy_path)) } @@ -58,7 +60,7 @@ fn replace_output_file_in_input_files( } } // if we created a TempDir its size must be one. - copy.map(|(dir, _copy)| (dir, 1)) + Ok(copy.map(|(dir, _copy)| (dir, 1))) } /// Merge pre-sorted `Box`s. @@ -69,13 +71,13 @@ pub fn merge<'a>( files: &mut [OsString], settings: &'a GlobalSettings, output: Option<&str>, -) -> FileMerger<'a> { - let tmp_dir = replace_output_file_in_input_files(files, settings, output); +) -> UResult> { + let tmp_dir = replace_output_file_in_input_files(files, settings, output)?; if settings.compress_prog.is_none() { merge_with_file_limit::<_, _, WriteablePlainTmpFile>( files .iter() - .map(|file| PlainMergeInput { inner: open(file) }), + .map(|file| open(file).map(|file| PlainMergeInput { inner: file })), settings, tmp_dir, ) @@ -83,7 +85,7 @@ pub fn merge<'a>( merge_with_file_limit::<_, _, WriteableCompressedTmpFile>( files .iter() - .map(|file| PlainMergeInput { inner: open(file) }), + .map(|file| open(file).map(|file| PlainMergeInput { inner: file })), settings, tmp_dir, ) @@ -93,24 +95,25 @@ pub fn merge<'a>( // Merge already sorted `MergeInput`s. pub fn merge_with_file_limit< M: MergeInput + 'static, - F: ExactSizeIterator, + F: ExactSizeIterator>, Tmp: WriteableTmpFile + 'static, >( files: F, settings: &GlobalSettings, tmp_dir: Option<(TempDir, usize)>, -) -> FileMerger { +) -> UResult { if files.len() > settings.merge_batch_size { // If we did not get a tmp_dir, create one. - let (tmp_dir, mut tmp_dir_size) = tmp_dir.unwrap_or_else(|| { - ( + let (tmp_dir, mut tmp_dir_size) = match tmp_dir { + Some(x) => x, + None => ( tempfile::Builder::new() .prefix("uutils_sort") .tempdir_in(&settings.tmp_dir) - .unwrap(), + .map_err(|_| SortError::TmpDirCreationFailed)?, 0, - ) - }); + ), + }; let mut remaining_files = files.len(); let batches = files.chunks(settings.merge_batch_size); let mut batches = batches.into_iter(); @@ -118,14 +121,14 @@ pub fn merge_with_file_limit< while remaining_files != 0 { // Work around the fact that `Chunks` is not an `ExactSizeIterator`. remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); - let mut merger = merge_without_limit(batches.next().unwrap(), settings); + let merger = merge_without_limit(batches.next().unwrap(), settings)?; let mut tmp_file = Tmp::create( tmp_dir.path().join(tmp_dir_size.to_string()), settings.compress_prog.as_deref(), - ); + )?; tmp_dir_size += 1; - merger.write_all_to(settings, tmp_file.as_write()); - temporary_files.push(tmp_file.finished_writing()); + merger.write_all_to(settings, tmp_file.as_write())?; + temporary_files.push(tmp_file.finished_writing()?); } assert!(batches.next().is_none()); merge_with_file_limit::<_, _, Tmp>( @@ -133,7 +136,7 @@ pub fn merge_with_file_limit< .into_iter() .map(Box::new(|c: Tmp::Closed| c.reopen()) as Box< - dyn FnMut(Tmp::Closed) -> ::Reopened, + dyn FnMut(Tmp::Closed) -> UResult<::Reopened>, >), settings, Some((tmp_dir, tmp_dir_size)), @@ -147,10 +150,10 @@ pub fn merge_with_file_limit< /// /// It is the responsibility of the caller to ensure that `files` yields only /// as many files as we are allowed to open concurrently. -fn merge_without_limit>( +fn merge_without_limit>>( files: F, settings: &GlobalSettings, -) -> FileMerger { +) -> UResult { let (request_sender, request_receiver) = channel(); let mut reader_files = Vec::with_capacity(files.size_hint().0); let mut loaded_receivers = Vec::with_capacity(files.size_hint().0); @@ -158,7 +161,7 @@ fn merge_without_limit>( let (sender, receiver) = sync_channel(2); loaded_receivers.push(receiver); reader_files.push(Some(ReaderFile { - file, + file: file?, sender, carry_over: vec![], })); @@ -175,7 +178,7 @@ fn merge_without_limit>( .unwrap(); } - thread::spawn({ + let reader_join_handle = thread::spawn({ let settings = settings.clone(); move || { reader( @@ -204,14 +207,15 @@ fn merge_without_limit>( } } - FileMerger { + Ok(FileMerger { heap: binary_heap_plus::BinaryHeap::from_vec_cmp( mergeable_files, FileComparator { settings }, ), request_sender, prev: None, - } + reader_join_handle, + }) } /// The struct on the reader thread representing an input file struct ReaderFile { @@ -226,7 +230,7 @@ fn reader( files: &mut [Option>], settings: &GlobalSettings, separator: u8, -) { +) -> UResult<()> { for (file_idx, recycled_chunk) in recycled_receiver.iter() { if let Some(ReaderFile { file, @@ -243,15 +247,16 @@ fn reader( &mut iter::empty(), separator, settings, - ); + )?; if !should_continue { // Remove the file from the list by replacing it with `None`. let ReaderFile { file, .. } = files[file_idx].take().unwrap(); // Depending on the kind of the `MergeInput`, this may delete the file: - file.finished_reading(); + file.finished_reading()?; } } } + Ok(()) } /// The struct on the main thread representing an input file pub struct MergeableFile { @@ -275,17 +280,20 @@ pub struct FileMerger<'a> { heap: binary_heap_plus::BinaryHeap>, request_sender: Sender<(usize, RecycledChunk)>, prev: Option, + reader_join_handle: JoinHandle>, } impl<'a> FileMerger<'a> { /// Write the merged contents to the output file. - pub fn write_all(&mut self, settings: &GlobalSettings, output: Output) { + pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> { let mut out = output.into_write(); - self.write_all_to(settings, &mut out); + self.write_all_to(settings, &mut out) } - pub fn write_all_to(&mut self, settings: &GlobalSettings, out: &mut impl Write) { + pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> { while self.write_next(settings, out) {} + drop(self.request_sender); + self.reader_join_handle.join().unwrap() } fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool { @@ -369,36 +377,41 @@ impl<'a> Compare for FileComparator<'a> { } // Wait for the child to exit and check its exit code. -fn assert_child_success(mut child: Child, program: &str) { +fn check_child_success(mut child: Child, program: &str) -> UResult<()> { if !matches!( child.wait().map(|e| e.code()), Ok(Some(0)) | Ok(None) | Err(_) ) { - crash!(2, "'{}' terminated abnormally", program) + Err(SortError::CompressProgTerminatedAbnormally { + prog: program.to_owned(), + } + .into()) + } else { + Ok(()) } } /// A temporary file that can be written to. -pub trait WriteableTmpFile { +pub trait WriteableTmpFile: Sized { type Closed: ClosedTmpFile; type InnerWrite: Write; - fn create(path: PathBuf, compress_prog: Option<&str>) -> Self; + fn create(path: PathBuf, compress_prog: Option<&str>) -> UResult; /// Closes the temporary file. - fn finished_writing(self) -> Self::Closed; + fn finished_writing(self) -> UResult; fn as_write(&mut self) -> &mut Self::InnerWrite; } /// A temporary file that is (temporarily) closed, but can be reopened. pub trait ClosedTmpFile { type Reopened: MergeInput; /// Reopens the temporary file. - fn reopen(self) -> Self::Reopened; + fn reopen(self) -> UResult; } /// A pre-sorted input for merging. pub trait MergeInput: Send { type InnerRead: Read; /// Cleans this `MergeInput` up. /// Implementations may delete the backing file. - fn finished_reading(self); + fn finished_reading(self) -> UResult<()>; fn as_read(&mut self) -> &mut Self::InnerRead; } @@ -417,15 +430,17 @@ impl WriteableTmpFile for WriteablePlainTmpFile { type Closed = ClosedPlainTmpFile; type InnerWrite = BufWriter; - fn create(path: PathBuf, _: Option<&str>) -> Self { - WriteablePlainTmpFile { - file: BufWriter::new(File::create(&path).unwrap()), + fn create(path: PathBuf, _: Option<&str>) -> UResult { + Ok(WriteablePlainTmpFile { + file: BufWriter::new( + File::create(&path).map_err(|error| SortError::OpenTmpFileFailed { error })?, + ), path, - } + }) } - fn finished_writing(self) -> Self::Closed { - ClosedPlainTmpFile { path: self.path } + fn finished_writing(self) -> UResult { + Ok(ClosedPlainTmpFile { path: self.path }) } fn as_write(&mut self) -> &mut Self::InnerWrite { @@ -434,18 +449,22 @@ impl WriteableTmpFile for WriteablePlainTmpFile { } impl ClosedTmpFile for ClosedPlainTmpFile { type Reopened = PlainTmpMergeInput; - fn reopen(self) -> Self::Reopened { - PlainTmpMergeInput { - file: File::open(&self.path).unwrap(), + fn reopen(self) -> UResult { + Ok(PlainTmpMergeInput { + file: File::open(&self.path).map_err(|error| SortError::OpenTmpFileFailed { error })?, path: self.path, - } + }) } } impl MergeInput for PlainTmpMergeInput { type InnerRead = File; - fn finished_reading(self) { - fs::remove_file(self.path).ok(); + fn finished_reading(self) -> UResult<()> { + // we ignore failures to delete the temporary file, + // because there is a race at the end of the execution and the whole + // temporary directory might already be gone. + let _ = fs::remove_file(self.path); + Ok(()) } fn as_read(&mut self) -> &mut Self::InnerRead { @@ -473,35 +492,33 @@ impl WriteableTmpFile for WriteableCompressedTmpFile { type Closed = ClosedCompressedTmpFile; type InnerWrite = BufWriter; - fn create(path: PathBuf, compress_prog: Option<&str>) -> Self { + fn create(path: PathBuf, compress_prog: Option<&str>) -> UResult { let compress_prog = compress_prog.unwrap(); let mut command = Command::new(compress_prog); - command - .stdin(Stdio::piped()) - .stdout(File::create(&path).unwrap()); - let mut child = crash_if_err!( - 2, - command.spawn().map_err(|err| format!( - "couldn't execute compress program: errno {}", - err.raw_os_error().unwrap() - )) - ); + let tmp_file = + File::create(&path).map_err(|error| SortError::OpenTmpFileFailed { error })?; + command.stdin(Stdio::piped()).stdout(tmp_file); + let mut child = command + .spawn() + .map_err(|err| SortError::CompressProgExecutionFailed { + code: err.raw_os_error().unwrap(), + })?; let child_stdin = child.stdin.take().unwrap(); - WriteableCompressedTmpFile { + Ok(WriteableCompressedTmpFile { path, compress_prog: compress_prog.to_owned(), child, child_stdin: BufWriter::new(child_stdin), - } + }) } - fn finished_writing(self) -> Self::Closed { + fn finished_writing(self) -> UResult { drop(self.child_stdin); - assert_child_success(self.child, &self.compress_prog); - ClosedCompressedTmpFile { + check_child_success(self.child, &self.compress_prog)?; + Ok(ClosedCompressedTmpFile { path: self.path, compress_prog: self.compress_prog, - } + }) } fn as_write(&mut self) -> &mut Self::InnerWrite { @@ -511,33 +528,32 @@ impl WriteableTmpFile for WriteableCompressedTmpFile { impl ClosedTmpFile for ClosedCompressedTmpFile { type Reopened = CompressedTmpMergeInput; - fn reopen(self) -> Self::Reopened { + fn reopen(self) -> UResult { let mut command = Command::new(&self.compress_prog); let file = File::open(&self.path).unwrap(); command.stdin(file).stdout(Stdio::piped()).arg("-d"); - let mut child = crash_if_err!( - 2, - command.spawn().map_err(|err| format!( - "couldn't execute compress program: errno {}", - err.raw_os_error().unwrap() - )) - ); + let mut child = command + .spawn() + .map_err(|err| SortError::CompressProgExecutionFailed { + code: err.raw_os_error().unwrap(), + })?; let child_stdout = child.stdout.take().unwrap(); - CompressedTmpMergeInput { + Ok(CompressedTmpMergeInput { path: self.path, compress_prog: self.compress_prog, child, child_stdout, - } + }) } } impl MergeInput for CompressedTmpMergeInput { type InnerRead = ChildStdout; - fn finished_reading(self) { + fn finished_reading(self) -> UResult<()> { drop(self.child_stdout); - assert_child_success(self.child, &self.compress_prog); - fs::remove_file(self.path).ok(); + check_child_success(self.child, &self.compress_prog)?; + let _ = fs::remove_file(self.path); + Ok(()) } fn as_read(&mut self) -> &mut Self::InnerRead { @@ -550,7 +566,9 @@ pub struct PlainMergeInput { } impl MergeInput for PlainMergeInput { type InnerRead = R; - fn finished_reading(self) {} + fn finished_reading(self) -> UResult<()> { + Ok(()) + } fn as_read(&mut self) -> &mut Self::InnerRead { &mut self.inner } diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 77cc4e9e9..be9510aef 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -33,14 +33,18 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use std::cmp::Ordering; use std::env; +use std::error::Error; use std::ffi::{OsStr, OsString}; +use std::fmt::Display; use std::fs::{File, OpenOptions}; use std::hash::{Hash, Hasher}; use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Write}; use std::ops::Range; use std::path::Path; use std::path::PathBuf; +use std::str::Utf8Error; use unicode_width::UnicodeWidthStr; +use uucore::error::{set_exit_code, UCustomError, UResult, USimpleError, UUsageError}; use uucore::parse_size::{parse_size, ParseSizeError}; use uucore::version_cmp::version_cmp; use uucore::InvalidEncodingHandling; @@ -120,6 +124,111 @@ const POSITIVE: char = '+'; // available memory into consideration, instead of relying on this constant only. const DEFAULT_BUF_SIZE: usize = 1_000_000_000; // 1 GB +#[derive(Debug)] +enum SortError { + Disorder { + file: OsString, + line_number: usize, + line: String, + silent: bool, + }, + OpenFailed { + path: String, + error: std::io::Error, + }, + ReadFailed { + path: String, + error: std::io::Error, + }, + ParseKeyError { + key: String, + msg: String, + }, + OpenTmpFileFailed { + error: std::io::Error, + }, + CompressProgExecutionFailed { + code: i32, + }, + CompressProgTerminatedAbnormally { + prog: String, + }, + TmpDirCreationFailed, + Uft8Error { + error: Utf8Error, + }, +} + +impl Error for SortError {} + +impl UCustomError for SortError { + fn code(&self) -> i32 { + match self { + SortError::Disorder { .. } => 1, + _ => 2, + } + } + + fn usage(&self) -> bool { + false + } +} + +impl Display for SortError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SortError::Disorder { + file, + line_number, + line, + silent, + } => { + if !silent { + write!( + f, + "{}:{}: disorder: {}", + file.to_string_lossy(), + line_number, + line + ) + } else { + Ok(()) + } + } + SortError::OpenFailed { path, error } => write!( + f, + "open failed: {}: {}", + path, + strip_errno(&error.to_string()) + ), + SortError::ParseKeyError { key, msg } => { + write!(f, "failed to parse key `{}`: {}", key, msg) + } + SortError::ReadFailed { path, error } => write!( + f, + "cannot read: {}: {}", + path, + strip_errno(&error.to_string()) + ), + SortError::OpenTmpFileFailed { error } => { + write!( + f, + "failed to open temporary file: {}", + strip_errno(&error.to_string()) + ) + } + SortError::CompressProgExecutionFailed { code } => { + write!(f, "couldn't execute compress program: errno {}", code) + } + SortError::CompressProgTerminatedAbnormally { prog } => { + write!(f, "'{}' terminated abnormally", prog) + } + SortError::TmpDirCreationFailed => write!(f, "could not create temporary directory"), + SortError::Uft8Error { error } => write!(f, "{}", error), + } + } +} + #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy, Debug)] enum SortMode { Numeric, @@ -150,23 +259,23 @@ pub struct Output { } impl Output { - fn new(name: Option<&str>) -> Self { - Self { - file: name.map(|name| { - // This is different from `File::create()` because we don't truncate the output yet. - // This allows using the output file as an input file. - ( - name.to_owned(), - OpenOptions::new() - .write(true) - .create(true) - .open(name) - .unwrap_or_else(|e| { - crash!(2, "open failed: {}: {}", name, strip_errno(&e.to_string())) - }), - ) - }), - } + fn new(name: Option<&str>) -> UResult { + let file = if let Some(name) = name { + // This is different from `File::create()` because we don't truncate the output yet. + // This allows using the output file as an input file. + let file = OpenOptions::new() + .write(true) + .create(true) + .open(name) + .map_err(|e| SortError::OpenFailed { + path: name.to_owned(), + error: e, + })?; + Some((name.to_owned(), file)) + } else { + None + }; + Ok(Self { file }) } fn into_write(self) -> BufWriter> { @@ -724,33 +833,37 @@ impl FieldSelector { } } - fn parse(key: &str, global_settings: &GlobalSettings) -> Self { + fn parse(key: &str, global_settings: &GlobalSettings) -> UResult { let mut from_to = key.split(','); let (from, from_options) = Self::split_key_options(from_to.next().unwrap()); let to = from_to.next().map(|to| Self::split_key_options(to)); let options_are_empty = from_options.is_empty() && matches!(to, None | Some((_, ""))); - crash_if_err!( - 2, - if options_are_empty { - // Inherit the global settings if there are no options attached to this key. - (|| { - // This would be ideal for a try block, I think. In the meantime this closure allows - // to use the `?` operator here. - Self::new( - KeyPosition::new(from, 1, global_settings.ignore_leading_blanks)?, - to.map(|(to, _)| { - KeyPosition::new(to, 0, global_settings.ignore_leading_blanks) - }) - .transpose()?, - KeySettings::from(global_settings), - ) - })() - } else { - // Do not inherit from `global_settings`, as there are options attached to this key. - Self::parse_with_options((from, from_options), to) + + if options_are_empty { + // Inherit the global settings if there are no options attached to this key. + (|| { + // This would be ideal for a try block, I think. In the meantime this closure allows + // to use the `?` operator here. + Self::new( + KeyPosition::new(from, 1, global_settings.ignore_leading_blanks)?, + to.map(|(to, _)| { + KeyPosition::new(to, 0, global_settings.ignore_leading_blanks) + }) + .transpose()?, + KeySettings::from(global_settings), + ) + })() + } else { + // Do not inherit from `global_settings`, as there are options attached to this key. + Self::parse_with_options((from, from_options), to) + } + .map_err(|msg| { + SortError::ParseKeyError { + key: key.to_owned(), + msg, } - .map_err(|e| format!("failed to parse key `{}`: {}", key, e)) - ) + .into() + }) } fn parse_with_options( @@ -962,7 +1075,8 @@ fn make_sort_mode_arg<'a, 'b>(mode: &'a str, short: &'b str, help: &'b str) -> A arg } -pub fn uumain(args: impl uucore::Args) -> i32 { +#[uucore_procs::gen_uumain] +pub fn uumain(args: impl uucore::Args) -> UResult<()> { let args = args .collect_str(InvalidEncodingHandling::Ignore) .accept_any(); @@ -979,11 +1093,11 @@ pub fn uumain(args: impl uucore::Args) -> i32 { // (clap returns 1). if e.use_stderr() { eprintln!("{}", e.message); - return 2; + set_exit_code(2); } else { println!("{}", e.message); - return 0; } + return Ok(()); } }; @@ -998,7 +1112,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 { let mut files = Vec::new(); for path in &files0_from { - let reader = open(&path); + let reader = open(&path)?; let buf_reader = BufReader::new(reader); for line in buf_reader.split(b'\0').flatten() { files.push(OsString::from( @@ -1055,12 +1169,14 @@ pub fn uumain(args: impl uucore::Args) -> i32 { env::set_var("RAYON_NUM_THREADS", &settings.threads); } - settings.buffer_size = matches - .value_of(options::BUF_SIZE) - .map_or(DEFAULT_BUF_SIZE, |s| { - GlobalSettings::parse_byte_count(s) - .unwrap_or_else(|e| crash!(2, "{}", format_error_message(e, s, options::BUF_SIZE))) - }); + settings.buffer_size = + matches + .value_of(options::BUF_SIZE) + .map_or(Ok(DEFAULT_BUF_SIZE), |s| { + GlobalSettings::parse_byte_count(s).map_err(|e| { + USimpleError::new(2, format_error_message(e, s, options::BUF_SIZE)) + }) + })?; settings.tmp_dir = matches .value_of(options::TMP_DIR) @@ -1070,9 +1186,9 @@ pub fn uumain(args: impl uucore::Args) -> i32 { settings.compress_prog = matches.value_of(options::COMPRESS_PROG).map(String::from); if let Some(n_merge) = matches.value_of(options::BATCH_SIZE) { - settings.merge_batch_size = n_merge - .parse() - .unwrap_or_else(|_| crash!(2, "invalid --batch-size argument '{}'", n_merge)); + settings.merge_batch_size = n_merge.parse().map_err(|_| { + UUsageError::new(2, format!("invalid --batch-size argument '{}'", n_merge)) + })?; } settings.zero_terminated = matches.is_present(options::ZERO_TERMINATED); @@ -1101,11 +1217,13 @@ pub fn uumain(args: impl uucore::Args) -> i32 { /* if no file, default to stdin */ files.push("-".to_string().into()); } else if settings.check && files.len() != 1 { - crash!( + return Err(UUsageError::new( 2, - "extra operand `{}' not allowed with -c", - files[1].to_string_lossy() - ) + format!( + "extra operand `{}' not allowed with -c", + files[1].to_string_lossy() + ), + )); } if let Some(arg) = matches.args.get(options::SEPARATOR) { @@ -1115,14 +1233,17 @@ pub fn uumain(args: impl uucore::Args) -> i32 { separator = "\0"; } if separator.len() != 1 { - crash!(2, "separator must be exactly one character long"); + return Err(UUsageError::new( + 2, + "separator must be exactly one character long".into(), + )); } settings.separator = Some(separator.chars().next().unwrap()) } if let Some(values) = matches.values_of(options::KEY) { for value in values { - let selector = FieldSelector::parse(value, &settings); + let selector = FieldSelector::parse(value, &settings)?; if selector.settings.mode == SortMode::Random && settings.salt.is_none() { settings.salt = Some(get_rand_string()); } @@ -1152,10 +1273,10 @@ pub fn uumain(args: impl uucore::Args) -> i32 { // and to reopen them at a later point. This is different from how the output file is handled, // probably to prevent running out of file descriptors. for file in &files { - open(file); + open(file)?; } - let output = Output::new(matches.value_of(options::OUTPUT)); + let output = Output::new(matches.value_of(options::OUTPUT))?; settings.init_precomputed(); @@ -1382,21 +1503,20 @@ pub fn uu_app() -> App<'static, 'static> { ) } -fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> i32 { +fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UResult<()> { if settings.merge { - let mut file_merger = merge::merge(files, settings, output.as_output_name()); - file_merger.write_all(settings, output); + let file_merger = merge::merge(files, settings, output.as_output_name())?; + file_merger.write_all(settings, output) } else if settings.check { if files.len() > 1 { - crash!(2, "only one file allowed with -c"); + Err(UUsageError::new(2, "only one file allowed with -c".into())) + } else { + check::check(files.first().unwrap(), settings) } - return check::check(files.first().unwrap(), settings); } else { let mut lines = files.iter().map(open); - - ext_sort(&mut lines, settings, output); + ext_sort(&mut lines, settings, output) } - 0 } fn sort_by<'a>(unsorted: &mut Vec>, settings: &GlobalSettings, line_data: &LineData<'a>) { @@ -1692,25 +1812,22 @@ fn strip_errno(err: &str) -> &str { &err[..err.find(" (os error ").unwrap_or(err.len())] } -fn open(path: impl AsRef) -> Box { +fn open(path: impl AsRef) -> UResult> { let path = path.as_ref(); if path == "-" { let stdin = stdin(); - return Box::new(stdin) as Box; + return Ok(Box::new(stdin) as Box); } let path = Path::new(path); match File::open(path) { - Ok(f) => Box::new(f) as Box, - Err(e) => { - crash!( - 2, - "cannot read: {0}: {1}", - path.to_string_lossy(), - strip_errno(&e.to_string()) - ); + Ok(f) => Ok(Box::new(f) as Box), + Err(error) => Err(SortError::ReadFailed { + path: path.to_string_lossy().to_string(), + error, } + .into()), } }