mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-28 11:37:44 +00:00
Merge pull request #2538 from miDeb/sort/input-is-output
sort: handle cases where the output file is also an input file
This commit is contained in:
commit
80712415d6
6 changed files with 181 additions and 48 deletions
|
@ -14,6 +14,7 @@ use crate::{
|
|||
use itertools::Itertools;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
ffi::OsStr,
|
||||
io::Read,
|
||||
iter,
|
||||
sync::mpsc::{sync_channel, Receiver, SyncSender},
|
||||
|
@ -25,7 +26,7 @@ use std::{
|
|||
/// # Returns
|
||||
///
|
||||
/// The code we should exit with.
|
||||
pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
||||
pub fn check(path: &OsStr, settings: &GlobalSettings) -> i32 {
|
||||
let max_allowed_cmp = if settings.unique {
|
||||
// If `unique` is enabled, the previous line must compare _less_ to the next one.
|
||||
Ordering::Less
|
||||
|
@ -41,7 +42,13 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
move || reader(file, recycled_receiver, loaded_sender, &settings)
|
||||
});
|
||||
for _ in 0..2 {
|
||||
let _ = recycled_sender.send(RecycledChunk::new(100 * 1024));
|
||||
let _ = recycled_sender.send(RecycledChunk::new(if settings.buffer_size < 100 * 1024 {
|
||||
// when the buffer size is smaller than 100KiB we choose it instead of the default.
|
||||
// this improves testability.
|
||||
settings.buffer_size
|
||||
} else {
|
||||
100 * 1024
|
||||
}));
|
||||
}
|
||||
|
||||
let mut prev_chunk: Option<Chunk> = None;
|
||||
|
@ -63,7 +70,12 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
) > max_allowed_cmp
|
||||
{
|
||||
if !settings.check_silent {
|
||||
eprintln!("sort: {}:{}: disorder: {}", path, line_idx, new_first.line);
|
||||
eprintln!(
|
||||
"sort: {}:{}: disorder: {}",
|
||||
path.to_string_lossy(),
|
||||
line_idx,
|
||||
new_first.line
|
||||
);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
@ -74,7 +86,12 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
|
|||
line_idx += 1;
|
||||
if compare_by(a, b, settings, chunk.line_data(), chunk.line_data()) > max_allowed_cmp {
|
||||
if !settings.check_silent {
|
||||
eprintln!("sort: {}:{}: disorder: {}", path, line_idx, b.line);
|
||||
eprintln!(
|
||||
"sort: {}:{}: disorder: {}",
|
||||
path.to_string_lossy(),
|
||||
line_idx,
|
||||
b.line
|
||||
);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -89,8 +89,6 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
|
|||
files,
|
||||
&settings.tmp_dir,
|
||||
separator,
|
||||
// Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly
|
||||
// around settings.buffer_size as a whole.
|
||||
buffer_size,
|
||||
settings,
|
||||
receiver,
|
||||
|
|
|
@ -9,10 +9,11 @@
|
|||
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
ffi::OsString,
|
||||
fs::{self, File},
|
||||
io::{BufWriter, Read, Write},
|
||||
iter,
|
||||
path::PathBuf,
|
||||
path::{Path, PathBuf},
|
||||
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
|
||||
rc::Rc,
|
||||
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
|
||||
|
@ -25,28 +26,66 @@ use tempfile::TempDir;
|
|||
|
||||
use crate::{
|
||||
chunks::{self, Chunk, RecycledChunk},
|
||||
compare_by, GlobalSettings, Output,
|
||||
compare_by, open, GlobalSettings, Output,
|
||||
};
|
||||
|
||||
/// If the output file occurs in the input files as well, copy the contents of the output file
|
||||
/// and replace its occurrences in the inputs with that copy.
|
||||
fn replace_output_file_in_input_files(
|
||||
files: &mut [OsString],
|
||||
settings: &GlobalSettings,
|
||||
output: Option<&str>,
|
||||
) -> Option<(TempDir, usize)> {
|
||||
let mut copy: Option<(TempDir, PathBuf)> = None;
|
||||
if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) {
|
||||
for file in files {
|
||||
if let Ok(file_path) = Path::new(file).canonicalize() {
|
||||
if file_path == output_path {
|
||||
if let Some((_dir, copy)) = © {
|
||||
*file = copy.clone().into_os_string();
|
||||
} else {
|
||||
let tmp_dir = tempfile::Builder::new()
|
||||
.prefix("uutils_sort")
|
||||
.tempdir_in(&settings.tmp_dir)
|
||||
.unwrap();
|
||||
let copy_path = tmp_dir.path().join("0");
|
||||
std::fs::copy(file_path, ©_path).unwrap();
|
||||
*file = copy_path.clone().into_os_string();
|
||||
copy = Some((tmp_dir, copy_path))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// if we created a TempDir its size must be one.
|
||||
copy.map(|(dir, _copy)| (dir, 1))
|
||||
}
|
||||
|
||||
/// Merge pre-sorted `Box<dyn Read>`s.
|
||||
///
|
||||
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
|
||||
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
|
||||
pub fn merge<Files: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
|
||||
files: Files,
|
||||
settings: &GlobalSettings,
|
||||
) -> FileMerger {
|
||||
pub fn merge<'a>(
|
||||
files: &mut [OsString],
|
||||
settings: &'a GlobalSettings,
|
||||
output: Option<&str>,
|
||||
) -> FileMerger<'a> {
|
||||
let tmp_dir = replace_output_file_in_input_files(files, settings, output);
|
||||
if settings.compress_prog.is_none() {
|
||||
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
|
||||
files.map(|file| PlainMergeInput { inner: file }),
|
||||
files
|
||||
.iter()
|
||||
.map(|file| PlainMergeInput { inner: open(file) }),
|
||||
settings,
|
||||
None,
|
||||
tmp_dir,
|
||||
)
|
||||
} else {
|
||||
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
|
||||
files.map(|file| PlainMergeInput { inner: file }),
|
||||
files
|
||||
.iter()
|
||||
.map(|file| PlainMergeInput { inner: open(file) }),
|
||||
settings,
|
||||
None,
|
||||
tmp_dir,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -155,12 +194,14 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
|
|||
let mut mergeable_files = vec![];
|
||||
|
||||
for (file_number, receiver) in loaded_receivers.into_iter().enumerate() {
|
||||
mergeable_files.push(MergeableFile {
|
||||
current_chunk: Rc::new(receiver.recv().unwrap()),
|
||||
file_number,
|
||||
line_idx: 0,
|
||||
receiver,
|
||||
})
|
||||
if let Ok(chunk) = receiver.recv() {
|
||||
mergeable_files.push(MergeableFile {
|
||||
current_chunk: Rc::new(chunk),
|
||||
file_number,
|
||||
line_idx: 0,
|
||||
receiver,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
FileMerger {
|
||||
|
|
|
@ -33,10 +33,10 @@ use rand::{thread_rng, Rng};
|
|||
use rayon::prelude::*;
|
||||
use std::cmp::Ordering;
|
||||
use std::env;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::File;
|
||||
use std::ffi::{OsStr, OsString};
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::io::{stdin, stdout, BufRead, BufReader, Read, Write};
|
||||
use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Write};
|
||||
use std::ops::Range;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
@ -146,24 +146,44 @@ impl SortMode {
|
|||
}
|
||||
|
||||
pub struct Output {
|
||||
file: Option<File>,
|
||||
file: Option<(String, File)>,
|
||||
}
|
||||
|
||||
impl Output {
|
||||
fn new(name: Option<&str>) -> Self {
|
||||
Self {
|
||||
file: name.map(|name| {
|
||||
File::create(name).unwrap_or_else(|e| {
|
||||
crash!(2, "open failed: {}: {}", name, strip_errno(&e.to_string()))
|
||||
})
|
||||
// This is different from `File::create()` because we don't truncate the output yet.
|
||||
// This allows using the output file as an input file.
|
||||
(
|
||||
name.to_owned(),
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(name)
|
||||
.unwrap_or_else(|e| {
|
||||
crash!(2, "open failed: {}: {}", name, strip_errno(&e.to_string()))
|
||||
}),
|
||||
)
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn into_write(self) -> Box<dyn Write> {
|
||||
match self.file {
|
||||
Some(file) => Box::new(file),
|
||||
fn into_write(self) -> BufWriter<Box<dyn Write>> {
|
||||
BufWriter::new(match self.file {
|
||||
Some((_name, file)) => {
|
||||
// truncate the file
|
||||
let _ = file.set_len(0);
|
||||
Box::new(file)
|
||||
}
|
||||
None => Box::new(stdout()),
|
||||
})
|
||||
}
|
||||
|
||||
fn as_output_name(&self) -> Option<&str> {
|
||||
match &self.file {
|
||||
Some((name, _file)) => Some(name),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -970,29 +990,28 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
|
|||
settings.debug = matches.is_present(options::DEBUG);
|
||||
|
||||
// check whether user specified a zero terminated list of files for input, otherwise read files from args
|
||||
let mut files: Vec<String> = if matches.is_present(options::FILES0_FROM) {
|
||||
let files0_from: Vec<String> = matches
|
||||
.values_of(options::FILES0_FROM)
|
||||
.map(|v| v.map(ToString::to_string).collect())
|
||||
let mut files: Vec<OsString> = if matches.is_present(options::FILES0_FROM) {
|
||||
let files0_from: Vec<OsString> = matches
|
||||
.values_of_os(options::FILES0_FROM)
|
||||
.map(|v| v.map(ToOwned::to_owned).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut files = Vec::new();
|
||||
for path in &files0_from {
|
||||
let reader = open(path.as_str());
|
||||
let reader = open(&path);
|
||||
let buf_reader = BufReader::new(reader);
|
||||
for line in buf_reader.split(b'\0').flatten() {
|
||||
files.push(
|
||||
files.push(OsString::from(
|
||||
std::str::from_utf8(&line)
|
||||
.expect("Could not parse string from zero terminated input.")
|
||||
.to_string(),
|
||||
);
|
||||
.expect("Could not parse string from zero terminated input."),
|
||||
));
|
||||
}
|
||||
}
|
||||
files
|
||||
} else {
|
||||
matches
|
||||
.values_of(options::FILES)
|
||||
.map(|v| v.map(ToString::to_string).collect())
|
||||
.values_of_os(options::FILES)
|
||||
.map(|v| v.map(ToOwned::to_owned).collect())
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
|
@ -1080,9 +1099,13 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
|
|||
|
||||
if files.is_empty() {
|
||||
/* if no file, default to stdin */
|
||||
files.push("-".to_owned());
|
||||
files.push("-".to_string().into());
|
||||
} else if settings.check && files.len() != 1 {
|
||||
crash!(2, "extra operand `{}' not allowed with -c", files[1])
|
||||
crash!(
|
||||
2,
|
||||
"extra operand `{}' not allowed with -c",
|
||||
files[1].to_string_lossy()
|
||||
)
|
||||
}
|
||||
|
||||
if let Some(arg) = matches.args.get(options::SEPARATOR) {
|
||||
|
@ -1136,7 +1159,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
|
|||
|
||||
settings.init_precomputed();
|
||||
|
||||
exec(&files, &settings, output)
|
||||
exec(&mut files, &settings, output)
|
||||
}
|
||||
|
||||
pub fn uu_app() -> App<'static, 'static> {
|
||||
|
@ -1359,9 +1382,9 @@ pub fn uu_app() -> App<'static, 'static> {
|
|||
)
|
||||
}
|
||||
|
||||
fn exec(files: &[String], settings: &GlobalSettings, output: Output) -> i32 {
|
||||
fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> i32 {
|
||||
if settings.merge {
|
||||
let mut file_merger = merge::merge(files.iter().map(open), settings);
|
||||
let mut file_merger = merge::merge(files, settings, output.as_output_name());
|
||||
file_merger.write_all(settings, output);
|
||||
} else if settings.check {
|
||||
if files.len() > 1 {
|
||||
|
|
|
@ -771,6 +771,7 @@ fn test_check() {
|
|||
new_ucmd!()
|
||||
.arg(diagnose_arg)
|
||||
.arg("check_fail.txt")
|
||||
.arg("--buffer-size=10b")
|
||||
.fails()
|
||||
.stderr_only("sort: check_fail.txt:6: disorder: 5\n");
|
||||
|
||||
|
@ -892,6 +893,29 @@ fn test_compress() {
|
|||
.stdout_only_fixture("ext_sort.expected");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn test_compress_merge() {
|
||||
new_ucmd!()
|
||||
.args(&[
|
||||
"--compress-program",
|
||||
"gzip",
|
||||
"-S",
|
||||
"10",
|
||||
"--batch-size=2",
|
||||
"-m",
|
||||
"--unique",
|
||||
"merge_ints_interleaved_1.txt",
|
||||
"merge_ints_interleaved_2.txt",
|
||||
"merge_ints_interleaved_3.txt",
|
||||
"merge_ints_interleaved_3.txt",
|
||||
"merge_ints_interleaved_2.txt",
|
||||
"merge_ints_interleaved_1.txt",
|
||||
])
|
||||
.succeeds()
|
||||
.stdout_only_fixture("merge_ints_interleaved.expected");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compress_fail() {
|
||||
TestScenario::new(util_name!())
|
||||
|
@ -976,6 +1000,7 @@ fn test_verifies_out_file() {
|
|||
new_ucmd!()
|
||||
.args(&["-o", "nonexistent_dir/nonexistent_file"])
|
||||
.pipe_in(input)
|
||||
.ignore_stdin_write_error()
|
||||
.fails()
|
||||
.status_code(2)
|
||||
.stderr_only(
|
||||
|
@ -1021,6 +1046,35 @@ fn test_separator_null() {
|
|||
.stdout_only("a\0z\0z\nz\0b\0a\nz\0a\0b\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_output_is_input() {
|
||||
let input = "a\nb\nc\n";
|
||||
let (at, mut cmd) = at_and_ucmd!();
|
||||
at.touch("file");
|
||||
at.append("file", input);
|
||||
cmd.args(&["-m", "-u", "-o", "file", "file", "file", "file"])
|
||||
.succeeds();
|
||||
assert_eq!(at.read("file"), input);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(unix)]
|
||||
fn test_output_device() {
|
||||
new_ucmd!()
|
||||
.args(&["-o", "/dev/null"])
|
||||
.pipe_in("input")
|
||||
.succeeds();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_empty_input() {
|
||||
new_ucmd!()
|
||||
.args(&["-m", "empty.txt"])
|
||||
.succeeds()
|
||||
.no_stderr()
|
||||
.no_stdout();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_no_error_for_version() {
|
||||
new_ucmd!()
|
||||
|
|
0
tests/fixtures/sort/empty.txt
vendored
Normal file
0
tests/fixtures/sort/empty.txt
vendored
Normal file
Loading…
Add table
Add a link
Reference in a new issue