1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 11:37:44 +00:00

sort: use UResult

This commit is contained in:
Michael Debertol 2021-08-01 16:51:56 +02:00
parent e8eb15f05e
commit a4709c805c
5 changed files with 370 additions and 227 deletions

View file

@ -9,7 +9,7 @@
use crate::{
chunks::{self, Chunk, RecycledChunk},
compare_by, open, GlobalSettings,
compare_by, open, GlobalSettings, SortError,
};
use itertools::Itertools;
use std::{
@ -20,13 +20,14 @@ use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread,
};
use uucore::error::UResult;
/// Check if the file at `path` is ordered.
///
/// # Returns
///
/// The code we should exit with.
pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 {
pub fn check(path: &OsStr, settings: &GlobalSettings) -> UResult<()> {
let max_allowed_cmp = if settings.unique {
// If `unique` is enabled, the previous line must compare _less_ to the next one.
Ordering::Less
@ -34,7 +35,7 @@ pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 {
// Otherwise, the line previous line must compare _less or equal_ to the next one.
Ordering::Equal
};
let file = open(path);
let file = open(path)?;
let (recycled_sender, recycled_receiver) = sync_channel(2);
let (loaded_sender, loaded_receiver) = sync_channel(2);
thread::spawn({
@ -69,15 +70,13 @@ pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 {
chunk.line_data(),
) > max_allowed_cmp
{
if !settings.check_silent {
eprintln!(
"sort: {}:{}: disorder: {}",
path.to_string_lossy(),
line_idx,
new_first.line
);
return Err(SortError::Disorder {
file: path.to_owned(),
line_number: line_idx,
line: new_first.line.to_owned(),
silent: settings.check_silent,
}
return 1;
.into());
}
let _ = recycled_sender.send(prev_chunk.recycle());
}
@ -85,21 +84,19 @@ pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 {
for (a, b) in chunk.lines().iter().tuple_windows() {
line_idx += 1;
if compare_by(a, b, settings, chunk.line_data(), chunk.line_data()) > max_allowed_cmp {
if !settings.check_silent {
eprintln!(
"sort: {}:{}: disorder: {}",
path.to_string_lossy(),
line_idx,
b.line
);
return Err(SortError::Disorder {
file: path.to_owned(),
line_number: line_idx,
line: b.line.to_owned(),
silent: settings.check_silent,
}
return 1;
.into());
}
}
prev_chunk = Some(chunk);
}
0
Ok(())
}
/// The function running on the reader thread.
@ -108,7 +105,7 @@ fn reader(
receiver: Receiver<RecycledChunk>,
sender: SyncSender<Chunk>,
settings: &GlobalSettings,
) {
) -> UResult<()> {
let mut carry_over = vec![];
for recycled_chunk in receiver.iter() {
let should_continue = chunks::read(
@ -124,9 +121,10 @@ fn reader(
b'\n'
},
settings,
);
)?;
if !should_continue {
break;
}
}
Ok(())
}

View file

@ -14,8 +14,9 @@ use std::{
use memchr::memchr_iter;
use ouroboros::self_referencing;
use uucore::error::{UResult, USimpleError};
use crate::{numeric_str_cmp::NumInfo, GeneralF64ParseResult, GlobalSettings, Line};
use crate::{numeric_str_cmp::NumInfo, GeneralF64ParseResult, GlobalSettings, Line, SortError};
/// The chunk that is passed around between threads.
/// `lines` consist of slices into `buffer`.
@ -137,10 +138,10 @@ pub fn read<T: Read>(
max_buffer_size: Option<usize>,
carry_over: &mut Vec<u8>,
file: &mut T,
next_files: &mut impl Iterator<Item = T>,
next_files: &mut impl Iterator<Item = UResult<T>>,
separator: u8,
settings: &GlobalSettings,
) -> bool {
) -> UResult<bool> {
let RecycledChunk {
lines,
selections,
@ -159,12 +160,12 @@ pub fn read<T: Read>(
max_buffer_size,
carry_over.len(),
separator,
);
)?;
carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]);
if read != 0 {
let payload = Chunk::new(buffer, |buffer| {
let payload: UResult<Chunk> = Chunk::try_new(buffer, |buffer| {
let selections = unsafe {
// SAFETY: It is safe to transmute to an empty vector of selections with shorter lifetime.
// It was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
@ -175,18 +176,19 @@ pub fn read<T: Read>(
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
};
let read = crash_if_err!(2, std::str::from_utf8(&buffer[..read]));
let read = std::str::from_utf8(&buffer[..read])
.map_err(|error| SortError::Uft8Error { error })?;
let mut line_data = LineData {
selections,
num_infos,
parsed_floats,
};
parse_lines(read, &mut lines, &mut line_data, separator, settings);
ChunkContents { lines, line_data }
Ok(ChunkContents { lines, line_data })
});
sender.send(payload).unwrap();
sender.send(payload?).unwrap();
}
should_continue
Ok(should_continue)
}
/// Split `read` into `Line`s, and add them to `lines`.
@ -242,12 +244,12 @@ fn parse_lines<'a>(
/// * Whether this function should be called again.
fn read_to_buffer<T: Read>(
file: &mut T,
next_files: &mut impl Iterator<Item = T>,
next_files: &mut impl Iterator<Item = UResult<T>>,
buffer: &mut Vec<u8>,
max_buffer_size: Option<usize>,
start_offset: usize,
separator: u8,
) -> (usize, bool) {
) -> UResult<(usize, bool)> {
let mut read_target = &mut buffer[start_offset..];
let mut last_file_target_size = read_target.len();
loop {
@ -274,7 +276,7 @@ fn read_to_buffer<T: Read>(
// We read enough lines.
let end = last_line_end.unwrap();
// We want to include the separator here, because it shouldn't be carried over.
return (end + 1, true);
return Ok((end + 1, true));
} else {
// We need to read more lines
let len = buffer.len();
@ -299,11 +301,11 @@ fn read_to_buffer<T: Read>(
if let Some(next_file) = next_files.next() {
// There is another file.
last_file_target_size = leftover_len;
*file = next_file;
*file = next_file?;
} else {
// This was the last file.
let read_len = buffer.len() - leftover_len;
return (read_len, false);
return Ok((read_len, false));
}
}
}
@ -313,7 +315,7 @@ fn read_to_buffer<T: Read>(
Err(e) if e.kind() == ErrorKind::Interrupted => {
// retry
}
Err(e) => crash!(2, "{}", e),
Err(e) => return Err(USimpleError::new(2, e.to_string())),
}
}
}

View file

@ -22,6 +22,7 @@ use std::{
};
use itertools::Itertools;
use uucore::error::UResult;
use crate::chunks::RecycledChunk;
use crate::merge::ClosedTmpFile;
@ -29,6 +30,7 @@ use crate::merge::WriteableCompressedTmpFile;
use crate::merge::WriteablePlainTmpFile;
use crate::merge::WriteableTmpFile;
use crate::Output;
use crate::SortError;
use crate::{
chunks::{self, Chunk},
compare_by, merge, sort_by, GlobalSettings,
@ -40,10 +42,10 @@ const START_BUFFER_SIZE: usize = 8_000;
/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
pub fn ext_sort(
files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
files: &mut impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
settings: &GlobalSettings,
output: Output,
) {
) -> UResult<()> {
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
thread::spawn({
@ -57,7 +59,7 @@ pub fn ext_sort(
sorted_receiver,
recycled_sender,
output,
);
)
} else {
reader_writer::<_, WriteablePlainTmpFile>(
files,
@ -65,17 +67,20 @@ pub fn ext_sort(
sorted_receiver,
recycled_sender,
output,
);
)
}
}
fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile + 'static>(
fn reader_writer<
F: Iterator<Item = UResult<Box<dyn Read + Send>>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
settings: &GlobalSettings,
receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>,
output: Output,
) {
) -> UResult<()> {
let separator = if settings.zero_terminated {
b'\0'
} else {
@ -93,16 +98,16 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
settings,
receiver,
sender,
);
)?;
match read_result {
ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => {
let tmp_dir_size = tmp_files.len();
let mut merger = merge::merge_with_file_limit::<_, _, Tmp>(
let merger = merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()),
settings,
Some((tmp_dir, tmp_dir_size)),
);
merger.write_all(settings, output);
)?;
merger.write_all(settings, output)?;
}
ReadResult::SortedSingleChunk(chunk) => {
if settings.unique {
@ -145,6 +150,7 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
// don't output anything
}
}
Ok(())
}
/// The function that is executed on the sorter thread.
@ -153,7 +159,11 @@ fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: Global
payload.with_contents_mut(|contents| {
sort_by(&mut contents.lines, &settings, &contents.line_data)
});
sender.send(payload).unwrap();
if sender.send(payload).is_err() {
// The receiver has gone away, likely because the other thread hit an error.
// We stop silently because the actual error is printed by the other thread.
return;
}
}
}
@ -173,15 +183,15 @@ enum ReadResult<I: WriteableTmpFile> {
}
/// The function that is executed on the reader/writer thread.
fn read_write_loop<I: WriteableTmpFile>(
mut files: impl Iterator<Item = Box<dyn Read + Send>>,
mut files: impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
tmp_dir_parent: &Path,
separator: u8,
buffer_size: usize,
settings: &GlobalSettings,
receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>,
) -> ReadResult<I> {
let mut file = files.next().unwrap();
) -> UResult<ReadResult<I>> {
let mut file = files.next().unwrap()?;
let mut carry_over = vec![];
// kick things off with two reads
@ -199,14 +209,14 @@ fn read_write_loop<I: WriteableTmpFile>(
&mut files,
separator,
settings,
);
)?;
if !should_continue {
drop(sender);
// We have already read the whole input. Since we are in our first two reads,
// this means that we can fit the whole input into memory. Bypass writing below and
// handle this case in a more straightforward way.
return if let Ok(first_chunk) = receiver.recv() {
return Ok(if let Ok(first_chunk) = receiver.recv() {
if let Ok(second_chunk) = receiver.recv() {
ReadResult::SortedTwoChunks([first_chunk, second_chunk])
} else {
@ -214,16 +224,14 @@ fn read_write_loop<I: WriteableTmpFile>(
}
} else {
ReadResult::EmptyInput
};
});
}
}
let tmp_dir = crash_if_err!(
2,
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(tmp_dir_parent)
);
let tmp_dir = tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(tmp_dir_parent)
.map_err(|_| SortError::TmpDirCreationFailed)?;
let mut sender_option = Some(sender);
let mut file_number = 0;
@ -232,7 +240,7 @@ fn read_write_loop<I: WriteableTmpFile>(
let mut chunk = match receiver.recv() {
Ok(it) => it,
_ => {
return ReadResult::WroteChunksToFile { tmp_files, tmp_dir };
return Ok(ReadResult::WroteChunksToFile { tmp_files, tmp_dir });
}
};
@ -241,7 +249,7 @@ fn read_write_loop<I: WriteableTmpFile>(
tmp_dir.path().join(file_number.to_string()),
settings.compress_prog.as_deref(),
separator,
);
)?;
tmp_files.push(tmp_file);
file_number += 1;
@ -258,7 +266,7 @@ fn read_write_loop<I: WriteableTmpFile>(
&mut files,
separator,
settings,
);
)?;
if !should_continue {
sender_option = None;
}
@ -273,8 +281,8 @@ fn write<I: WriteableTmpFile>(
file: PathBuf,
compress_prog: Option<&str>,
separator: u8,
) -> I::Closed {
let mut tmp_file = I::create(file, compress_prog);
) -> UResult<I::Closed> {
let mut tmp_file = I::create(file, compress_prog)?;
write_lines(chunk.lines(), tmp_file.as_write(), separator);
tmp_file.finished_writing()
}

View file

@ -17,16 +17,17 @@ use std::{
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
rc::Rc,
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
thread,
thread::{self, JoinHandle},
};
use compare::Compare;
use itertools::Itertools;
use tempfile::TempDir;
use uucore::error::UResult;
use crate::{
chunks::{self, Chunk, RecycledChunk},
compare_by, open, GlobalSettings, Output,
compare_by, open, GlobalSettings, Output, SortError,
};
/// If the output file occurs in the input files as well, copy the contents of the output file
@ -35,7 +36,7 @@ fn replace_output_file_in_input_files(
files: &mut [OsString],
settings: &GlobalSettings,
output: Option<&str>,
) -> Option<(TempDir, usize)> {
) -> UResult<Option<(TempDir, usize)>> {
let mut copy: Option<(TempDir, PathBuf)> = None;
if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) {
for file in files {
@ -47,9 +48,10 @@ fn replace_output_file_in_input_files(
let tmp_dir = tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.unwrap();
.map_err(|_| SortError::TmpDirCreationFailed)?;
let copy_path = tmp_dir.path().join("0");
std::fs::copy(file_path, &copy_path).unwrap();
std::fs::copy(file_path, &copy_path)
.map_err(|error| SortError::OpenTmpFileFailed { error })?;
*file = copy_path.clone().into_os_string();
copy = Some((tmp_dir, copy_path))
}
@ -58,7 +60,7 @@ fn replace_output_file_in_input_files(
}
}
// if we created a TempDir its size must be one.
copy.map(|(dir, _copy)| (dir, 1))
Ok(copy.map(|(dir, _copy)| (dir, 1)))
}
/// Merge pre-sorted `Box<dyn Read>`s.
@ -69,13 +71,13 @@ pub fn merge<'a>(
files: &mut [OsString],
settings: &'a GlobalSettings,
output: Option<&str>,
) -> FileMerger<'a> {
let tmp_dir = replace_output_file_in_input_files(files, settings, output);
) -> UResult<FileMerger<'a>> {
let tmp_dir = replace_output_file_in_input_files(files, settings, output)?;
if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
files
.iter()
.map(|file| PlainMergeInput { inner: open(file) }),
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
@ -83,7 +85,7 @@ pub fn merge<'a>(
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
files
.iter()
.map(|file| PlainMergeInput { inner: open(file) }),
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
@ -93,24 +95,25 @@ pub fn merge<'a>(
// Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit<
M: MergeInput + 'static,
F: ExactSizeIterator<Item = M>,
F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
settings: &GlobalSettings,
tmp_dir: Option<(TempDir, usize)>,
) -> FileMerger {
) -> UResult<FileMerger> {
if files.len() > settings.merge_batch_size {
// If we did not get a tmp_dir, create one.
let (tmp_dir, mut tmp_dir_size) = tmp_dir.unwrap_or_else(|| {
(
let (tmp_dir, mut tmp_dir_size) = match tmp_dir {
Some(x) => x,
None => (
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.unwrap(),
.map_err(|_| SortError::TmpDirCreationFailed)?,
0,
)
});
),
};
let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter();
@ -118,14 +121,14 @@ pub fn merge_with_file_limit<
while remaining_files != 0 {
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let mut merger = merge_without_limit(batches.next().unwrap(), settings);
let merger = merge_without_limit(batches.next().unwrap(), settings)?;
let mut tmp_file = Tmp::create(
tmp_dir.path().join(tmp_dir_size.to_string()),
settings.compress_prog.as_deref(),
);
)?;
tmp_dir_size += 1;
merger.write_all_to(settings, tmp_file.as_write());
temporary_files.push(tmp_file.finished_writing());
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
assert!(batches.next().is_none());
merge_with_file_limit::<_, _, Tmp>(
@ -133,7 +136,7 @@ pub fn merge_with_file_limit<
.into_iter()
.map(Box::new(|c: Tmp::Closed| c.reopen())
as Box<
dyn FnMut(Tmp::Closed) -> <Tmp::Closed as ClosedTmpFile>::Reopened,
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>),
settings,
Some((tmp_dir, tmp_dir_size)),
@ -147,10 +150,10 @@ pub fn merge_with_file_limit<
///
/// It is the responsibility of the caller to ensure that `files` yields only
/// as many files as we are allowed to open concurrently.
fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = UResult<M>>>(
files: F,
settings: &GlobalSettings,
) -> FileMerger {
) -> UResult<FileMerger> {
let (request_sender, request_receiver) = channel();
let mut reader_files = Vec::with_capacity(files.size_hint().0);
let mut loaded_receivers = Vec::with_capacity(files.size_hint().0);
@ -158,7 +161,7 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
let (sender, receiver) = sync_channel(2);
loaded_receivers.push(receiver);
reader_files.push(Some(ReaderFile {
file,
file: file?,
sender,
carry_over: vec![],
}));
@ -175,7 +178,7 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
.unwrap();
}
thread::spawn({
let reader_join_handle = thread::spawn({
let settings = settings.clone();
move || {
reader(
@ -204,14 +207,15 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
}
}
FileMerger {
Ok(FileMerger {
heap: binary_heap_plus::BinaryHeap::from_vec_cmp(
mergeable_files,
FileComparator { settings },
),
request_sender,
prev: None,
}
reader_join_handle,
})
}
/// The struct on the reader thread representing an input file
struct ReaderFile<M: MergeInput> {
@ -226,7 +230,7 @@ fn reader(
files: &mut [Option<ReaderFile<impl MergeInput>>],
settings: &GlobalSettings,
separator: u8,
) {
) -> UResult<()> {
for (file_idx, recycled_chunk) in recycled_receiver.iter() {
if let Some(ReaderFile {
file,
@ -243,15 +247,16 @@ fn reader(
&mut iter::empty(),
separator,
settings,
);
)?;
if !should_continue {
// Remove the file from the list by replacing it with `None`.
let ReaderFile { file, .. } = files[file_idx].take().unwrap();
// Depending on the kind of the `MergeInput`, this may delete the file:
file.finished_reading();
file.finished_reading()?;
}
}
}
Ok(())
}
/// The struct on the main thread representing an input file
pub struct MergeableFile {
@ -275,17 +280,20 @@ pub struct FileMerger<'a> {
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
request_sender: Sender<(usize, RecycledChunk)>,
prev: Option<PreviousLine>,
reader_join_handle: JoinHandle<UResult<()>>,
}
impl<'a> FileMerger<'a> {
/// Write the merged contents to the output file.
pub fn write_all(&mut self, settings: &GlobalSettings, output: Output) {
pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
let mut out = output.into_write();
self.write_all_to(settings, &mut out);
self.write_all_to(settings, &mut out)
}
pub fn write_all_to(&mut self, settings: &GlobalSettings, out: &mut impl Write) {
pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
while self.write_next(settings, out) {}
drop(self.request_sender);
self.reader_join_handle.join().unwrap()
}
fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool {
@ -369,36 +377,41 @@ impl<'a> Compare<MergeableFile> for FileComparator<'a> {
}
// Wait for the child to exit and check its exit code.
fn assert_child_success(mut child: Child, program: &str) {
fn check_child_success(mut child: Child, program: &str) -> UResult<()> {
if !matches!(
child.wait().map(|e| e.code()),
Ok(Some(0)) | Ok(None) | Err(_)
) {
crash!(2, "'{}' terminated abnormally", program)
Err(SortError::CompressProgTerminatedAbnormally {
prog: program.to_owned(),
}
.into())
} else {
Ok(())
}
}
/// A temporary file that can be written to.
pub trait WriteableTmpFile {
pub trait WriteableTmpFile: Sized {
type Closed: ClosedTmpFile;
type InnerWrite: Write;
fn create(path: PathBuf, compress_prog: Option<&str>) -> Self;
fn create(path: PathBuf, compress_prog: Option<&str>) -> UResult<Self>;
/// Closes the temporary file.
fn finished_writing(self) -> Self::Closed;
fn finished_writing(self) -> UResult<Self::Closed>;
fn as_write(&mut self) -> &mut Self::InnerWrite;
}
/// A temporary file that is (temporarily) closed, but can be reopened.
pub trait ClosedTmpFile {
type Reopened: MergeInput;
/// Reopens the temporary file.
fn reopen(self) -> Self::Reopened;
fn reopen(self) -> UResult<Self::Reopened>;
}
/// A pre-sorted input for merging.
pub trait MergeInput: Send {
type InnerRead: Read;
/// Cleans this `MergeInput` up.
/// Implementations may delete the backing file.
fn finished_reading(self);
fn finished_reading(self) -> UResult<()>;
fn as_read(&mut self) -> &mut Self::InnerRead;
}
@ -417,15 +430,17 @@ impl WriteableTmpFile for WriteablePlainTmpFile {
type Closed = ClosedPlainTmpFile;
type InnerWrite = BufWriter<File>;
fn create(path: PathBuf, _: Option<&str>) -> Self {
WriteablePlainTmpFile {
file: BufWriter::new(File::create(&path).unwrap()),
fn create(path: PathBuf, _: Option<&str>) -> UResult<Self> {
Ok(WriteablePlainTmpFile {
file: BufWriter::new(
File::create(&path).map_err(|error| SortError::OpenTmpFileFailed { error })?,
),
path,
}
})
}
fn finished_writing(self) -> Self::Closed {
ClosedPlainTmpFile { path: self.path }
fn finished_writing(self) -> UResult<Self::Closed> {
Ok(ClosedPlainTmpFile { path: self.path })
}
fn as_write(&mut self) -> &mut Self::InnerWrite {
@ -434,18 +449,22 @@ impl WriteableTmpFile for WriteablePlainTmpFile {
}
impl ClosedTmpFile for ClosedPlainTmpFile {
type Reopened = PlainTmpMergeInput;
fn reopen(self) -> Self::Reopened {
PlainTmpMergeInput {
file: File::open(&self.path).unwrap(),
fn reopen(self) -> UResult<Self::Reopened> {
Ok(PlainTmpMergeInput {
file: File::open(&self.path).map_err(|error| SortError::OpenTmpFileFailed { error })?,
path: self.path,
}
})
}
}
impl MergeInput for PlainTmpMergeInput {
type InnerRead = File;
fn finished_reading(self) {
fs::remove_file(self.path).ok();
fn finished_reading(self) -> UResult<()> {
// we ignore failures to delete the temporary file,
// because there is a race at the end of the execution and the whole
// temporary directory might already be gone.
let _ = fs::remove_file(self.path);
Ok(())
}
fn as_read(&mut self) -> &mut Self::InnerRead {
@ -473,35 +492,33 @@ impl WriteableTmpFile for WriteableCompressedTmpFile {
type Closed = ClosedCompressedTmpFile;
type InnerWrite = BufWriter<ChildStdin>;
fn create(path: PathBuf, compress_prog: Option<&str>) -> Self {
fn create(path: PathBuf, compress_prog: Option<&str>) -> UResult<Self> {
let compress_prog = compress_prog.unwrap();
let mut command = Command::new(compress_prog);
command
.stdin(Stdio::piped())
.stdout(File::create(&path).unwrap());
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let tmp_file =
File::create(&path).map_err(|error| SortError::OpenTmpFileFailed { error })?;
command.stdin(Stdio::piped()).stdout(tmp_file);
let mut child = command
.spawn()
.map_err(|err| SortError::CompressProgExecutionFailed {
code: err.raw_os_error().unwrap(),
})?;
let child_stdin = child.stdin.take().unwrap();
WriteableCompressedTmpFile {
Ok(WriteableCompressedTmpFile {
path,
compress_prog: compress_prog.to_owned(),
child,
child_stdin: BufWriter::new(child_stdin),
}
})
}
fn finished_writing(self) -> Self::Closed {
fn finished_writing(self) -> UResult<Self::Closed> {
drop(self.child_stdin);
assert_child_success(self.child, &self.compress_prog);
ClosedCompressedTmpFile {
check_child_success(self.child, &self.compress_prog)?;
Ok(ClosedCompressedTmpFile {
path: self.path,
compress_prog: self.compress_prog,
}
})
}
fn as_write(&mut self) -> &mut Self::InnerWrite {
@ -511,33 +528,32 @@ impl WriteableTmpFile for WriteableCompressedTmpFile {
impl ClosedTmpFile for ClosedCompressedTmpFile {
type Reopened = CompressedTmpMergeInput;
fn reopen(self) -> Self::Reopened {
fn reopen(self) -> UResult<Self::Reopened> {
let mut command = Command::new(&self.compress_prog);
let file = File::open(&self.path).unwrap();
command.stdin(file).stdout(Stdio::piped()).arg("-d");
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let mut child = command
.spawn()
.map_err(|err| SortError::CompressProgExecutionFailed {
code: err.raw_os_error().unwrap(),
})?;
let child_stdout = child.stdout.take().unwrap();
CompressedTmpMergeInput {
Ok(CompressedTmpMergeInput {
path: self.path,
compress_prog: self.compress_prog,
child,
child_stdout,
}
})
}
}
impl MergeInput for CompressedTmpMergeInput {
type InnerRead = ChildStdout;
fn finished_reading(self) {
fn finished_reading(self) -> UResult<()> {
drop(self.child_stdout);
assert_child_success(self.child, &self.compress_prog);
fs::remove_file(self.path).ok();
check_child_success(self.child, &self.compress_prog)?;
let _ = fs::remove_file(self.path);
Ok(())
}
fn as_read(&mut self) -> &mut Self::InnerRead {
@ -550,7 +566,9 @@ pub struct PlainMergeInput<R: Read + Send> {
}
impl<R: Read + Send> MergeInput for PlainMergeInput<R> {
type InnerRead = R;
fn finished_reading(self) {}
fn finished_reading(self) -> UResult<()> {
Ok(())
}
fn as_read(&mut self) -> &mut Self::InnerRead {
&mut self.inner
}

View file

@ -33,14 +33,18 @@ use rand::{thread_rng, Rng};
use rayon::prelude::*;
use std::cmp::Ordering;
use std::env;
use std::error::Error;
use std::ffi::{OsStr, OsString};
use std::fmt::Display;
use std::fs::{File, OpenOptions};
use std::hash::{Hash, Hasher};
use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Write};
use std::ops::Range;
use std::path::Path;
use std::path::PathBuf;
use std::str::Utf8Error;
use unicode_width::UnicodeWidthStr;
use uucore::error::{set_exit_code, UCustomError, UResult, USimpleError, UUsageError};
use uucore::parse_size::{parse_size, ParseSizeError};
use uucore::version_cmp::version_cmp;
use uucore::InvalidEncodingHandling;
@ -120,6 +124,111 @@ const POSITIVE: char = '+';
// available memory into consideration, instead of relying on this constant only.
const DEFAULT_BUF_SIZE: usize = 1_000_000_000; // 1 GB
#[derive(Debug)]
enum SortError {
Disorder {
file: OsString,
line_number: usize,
line: String,
silent: bool,
},
OpenFailed {
path: String,
error: std::io::Error,
},
ReadFailed {
path: String,
error: std::io::Error,
},
ParseKeyError {
key: String,
msg: String,
},
OpenTmpFileFailed {
error: std::io::Error,
},
CompressProgExecutionFailed {
code: i32,
},
CompressProgTerminatedAbnormally {
prog: String,
},
TmpDirCreationFailed,
Uft8Error {
error: Utf8Error,
},
}
impl Error for SortError {}
impl UCustomError for SortError {
fn code(&self) -> i32 {
match self {
SortError::Disorder { .. } => 1,
_ => 2,
}
}
fn usage(&self) -> bool {
false
}
}
impl Display for SortError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SortError::Disorder {
file,
line_number,
line,
silent,
} => {
if !silent {
write!(
f,
"{}:{}: disorder: {}",
file.to_string_lossy(),
line_number,
line
)
} else {
Ok(())
}
}
SortError::OpenFailed { path, error } => write!(
f,
"open failed: {}: {}",
path,
strip_errno(&error.to_string())
),
SortError::ParseKeyError { key, msg } => {
write!(f, "failed to parse key `{}`: {}", key, msg)
}
SortError::ReadFailed { path, error } => write!(
f,
"cannot read: {}: {}",
path,
strip_errno(&error.to_string())
),
SortError::OpenTmpFileFailed { error } => {
write!(
f,
"failed to open temporary file: {}",
strip_errno(&error.to_string())
)
}
SortError::CompressProgExecutionFailed { code } => {
write!(f, "couldn't execute compress program: errno {}", code)
}
SortError::CompressProgTerminatedAbnormally { prog } => {
write!(f, "'{}' terminated abnormally", prog)
}
SortError::TmpDirCreationFailed => write!(f, "could not create temporary directory"),
SortError::Uft8Error { error } => write!(f, "{}", error),
}
}
}
#[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy, Debug)]
enum SortMode {
Numeric,
@ -150,23 +259,23 @@ pub struct Output {
}
impl Output {
fn new(name: Option<&str>) -> Self {
Self {
file: name.map(|name| {
// This is different from `File::create()` because we don't truncate the output yet.
// This allows using the output file as an input file.
(
name.to_owned(),
OpenOptions::new()
.write(true)
.create(true)
.open(name)
.unwrap_or_else(|e| {
crash!(2, "open failed: {}: {}", name, strip_errno(&e.to_string()))
}),
)
}),
}
fn new(name: Option<&str>) -> UResult<Self> {
let file = if let Some(name) = name {
// This is different from `File::create()` because we don't truncate the output yet.
// This allows using the output file as an input file.
let file = OpenOptions::new()
.write(true)
.create(true)
.open(name)
.map_err(|e| SortError::OpenFailed {
path: name.to_owned(),
error: e,
})?;
Some((name.to_owned(), file))
} else {
None
};
Ok(Self { file })
}
fn into_write(self) -> BufWriter<Box<dyn Write>> {
@ -724,33 +833,37 @@ impl FieldSelector {
}
}
fn parse(key: &str, global_settings: &GlobalSettings) -> Self {
fn parse(key: &str, global_settings: &GlobalSettings) -> UResult<Self> {
let mut from_to = key.split(',');
let (from, from_options) = Self::split_key_options(from_to.next().unwrap());
let to = from_to.next().map(|to| Self::split_key_options(to));
let options_are_empty = from_options.is_empty() && matches!(to, None | Some((_, "")));
crash_if_err!(
2,
if options_are_empty {
// Inherit the global settings if there are no options attached to this key.
(|| {
// This would be ideal for a try block, I think. In the meantime this closure allows
// to use the `?` operator here.
Self::new(
KeyPosition::new(from, 1, global_settings.ignore_leading_blanks)?,
to.map(|(to, _)| {
KeyPosition::new(to, 0, global_settings.ignore_leading_blanks)
})
.transpose()?,
KeySettings::from(global_settings),
)
})()
} else {
// Do not inherit from `global_settings`, as there are options attached to this key.
Self::parse_with_options((from, from_options), to)
if options_are_empty {
// Inherit the global settings if there are no options attached to this key.
(|| {
// This would be ideal for a try block, I think. In the meantime this closure allows
// to use the `?` operator here.
Self::new(
KeyPosition::new(from, 1, global_settings.ignore_leading_blanks)?,
to.map(|(to, _)| {
KeyPosition::new(to, 0, global_settings.ignore_leading_blanks)
})
.transpose()?,
KeySettings::from(global_settings),
)
})()
} else {
// Do not inherit from `global_settings`, as there are options attached to this key.
Self::parse_with_options((from, from_options), to)
}
.map_err(|msg| {
SortError::ParseKeyError {
key: key.to_owned(),
msg,
}
.map_err(|e| format!("failed to parse key `{}`: {}", key, e))
)
.into()
})
}
fn parse_with_options(
@ -962,7 +1075,8 @@ fn make_sort_mode_arg<'a, 'b>(mode: &'a str, short: &'b str, help: &'b str) -> A
arg
}
pub fn uumain(args: impl uucore::Args) -> i32 {
#[uucore_procs::gen_uumain]
pub fn uumain(args: impl uucore::Args) -> UResult<()> {
let args = args
.collect_str(InvalidEncodingHandling::Ignore)
.accept_any();
@ -979,11 +1093,11 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
// (clap returns 1).
if e.use_stderr() {
eprintln!("{}", e.message);
return 2;
set_exit_code(2);
} else {
println!("{}", e.message);
return 0;
}
return Ok(());
}
};
@ -998,7 +1112,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
let mut files = Vec::new();
for path in &files0_from {
let reader = open(&path);
let reader = open(&path)?;
let buf_reader = BufReader::new(reader);
for line in buf_reader.split(b'\0').flatten() {
files.push(OsString::from(
@ -1055,12 +1169,14 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
env::set_var("RAYON_NUM_THREADS", &settings.threads);
}
settings.buffer_size = matches
.value_of(options::BUF_SIZE)
.map_or(DEFAULT_BUF_SIZE, |s| {
GlobalSettings::parse_byte_count(s)
.unwrap_or_else(|e| crash!(2, "{}", format_error_message(e, s, options::BUF_SIZE)))
});
settings.buffer_size =
matches
.value_of(options::BUF_SIZE)
.map_or(Ok(DEFAULT_BUF_SIZE), |s| {
GlobalSettings::parse_byte_count(s).map_err(|e| {
USimpleError::new(2, format_error_message(e, s, options::BUF_SIZE))
})
})?;
settings.tmp_dir = matches
.value_of(options::TMP_DIR)
@ -1070,9 +1186,9 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
settings.compress_prog = matches.value_of(options::COMPRESS_PROG).map(String::from);
if let Some(n_merge) = matches.value_of(options::BATCH_SIZE) {
settings.merge_batch_size = n_merge
.parse()
.unwrap_or_else(|_| crash!(2, "invalid --batch-size argument '{}'", n_merge));
settings.merge_batch_size = n_merge.parse().map_err(|_| {
UUsageError::new(2, format!("invalid --batch-size argument '{}'", n_merge))
})?;
}
settings.zero_terminated = matches.is_present(options::ZERO_TERMINATED);
@ -1101,11 +1217,13 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
/* if no file, default to stdin */
files.push("-".to_string().into());
} else if settings.check && files.len() != 1 {
crash!(
return Err(UUsageError::new(
2,
"extra operand `{}' not allowed with -c",
files[1].to_string_lossy()
)
format!(
"extra operand `{}' not allowed with -c",
files[1].to_string_lossy()
),
));
}
if let Some(arg) = matches.args.get(options::SEPARATOR) {
@ -1115,14 +1233,17 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
separator = "\0";
}
if separator.len() != 1 {
crash!(2, "separator must be exactly one character long");
return Err(UUsageError::new(
2,
"separator must be exactly one character long".into(),
));
}
settings.separator = Some(separator.chars().next().unwrap())
}
if let Some(values) = matches.values_of(options::KEY) {
for value in values {
let selector = FieldSelector::parse(value, &settings);
let selector = FieldSelector::parse(value, &settings)?;
if selector.settings.mode == SortMode::Random && settings.salt.is_none() {
settings.salt = Some(get_rand_string());
}
@ -1152,10 +1273,10 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
// and to reopen them at a later point. This is different from how the output file is handled,
// probably to prevent running out of file descriptors.
for file in &files {
open(file);
open(file)?;
}
let output = Output::new(matches.value_of(options::OUTPUT));
let output = Output::new(matches.value_of(options::OUTPUT))?;
settings.init_precomputed();
@ -1382,21 +1503,20 @@ pub fn uu_app() -> App<'static, 'static> {
)
}
fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> i32 {
fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UResult<()> {
if settings.merge {
let mut file_merger = merge::merge(files, settings, output.as_output_name());
file_merger.write_all(settings, output);
let file_merger = merge::merge(files, settings, output.as_output_name())?;
file_merger.write_all(settings, output)
} else if settings.check {
if files.len() > 1 {
crash!(2, "only one file allowed with -c");
Err(UUsageError::new(2, "only one file allowed with -c".into()))
} else {
check::check(files.first().unwrap(), settings)
}
return check::check(files.first().unwrap(), settings);
} else {
let mut lines = files.iter().map(open);
ext_sort(&mut lines, settings, output);
ext_sort(&mut lines, settings, output)
}
0
}
fn sort_by<'a>(unsorted: &mut Vec<Line<'a>>, settings: &GlobalSettings, line_data: &LineData<'a>) {
@ -1692,25 +1812,22 @@ fn strip_errno(err: &str) -> &str {
&err[..err.find(" (os error ").unwrap_or(err.len())]
}
fn open(path: impl AsRef<OsStr>) -> Box<dyn Read + Send> {
fn open(path: impl AsRef<OsStr>) -> UResult<Box<dyn Read + Send>> {
let path = path.as_ref();
if path == "-" {
let stdin = stdin();
return Box::new(stdin) as Box<dyn Read + Send>;
return Ok(Box::new(stdin) as Box<dyn Read + Send>);
}
let path = Path::new(path);
match File::open(path) {
Ok(f) => Box::new(f) as Box<dyn Read + Send>,
Err(e) => {
crash!(
2,
"cannot read: {0}: {1}",
path.to_string_lossy(),
strip_errno(&e.to_string())
);
Ok(f) => Ok(Box::new(f) as Box<dyn Read + Send>),
Err(error) => Err(SortError::ReadFailed {
path: path.to_string_lossy().to_string(),
error,
}
.into()),
}
}