1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 11:37:44 +00:00

Merge pull request #2544 from miDeb/sort/delete-tmps

sort: delete temporary files when sort is terminated
This commit is contained in:
Sylvestre Ledru 2021-08-06 11:52:06 +02:00 committed by GitHub
commit 32d281bbcc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 72 deletions

26
Cargo.lock generated
View file

@ -17,6 +17,12 @@ dependencies = [
"memchr 2.4.0", "memchr 2.4.0",
] ]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]] [[package]]
name = "ansi_term" name = "ansi_term"
version = "0.11.0" version = "0.11.0"
@ -589,6 +595,16 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ctrlc"
version = "3.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232295399409a8b7ae41276757b5a1cc21032848d42bff2352261f958b3ca29a"
dependencies = [
"nix 0.20.0",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "custom_derive" name = "custom_derive"
version = "0.1.7" version = "0.1.7"
@ -1171,19 +1187,20 @@ dependencies = [
[[package]] [[package]]
name = "ouroboros" name = "ouroboros"
version = "0.9.5" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbeff60e3e37407a80ead3e9458145b456e978c4068cddbfea6afb48572962ca" checksum = "84236d64f1718c387232287cf036eb6632a5ecff226f4ff9dccb8c2b79ba0bde"
dependencies = [ dependencies = [
"aliasable",
"ouroboros_macro", "ouroboros_macro",
"stable_deref_trait", "stable_deref_trait",
] ]
[[package]] [[package]]
name = "ouroboros_macro" name = "ouroboros_macro"
version = "0.9.5" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03f2cb802b5bdfdf52f1ffa0b54ce105e4d346e91990dd571f86c91321ad49e2" checksum = "f463857a6eb96c0136b1d56e56c718350cef30412ec065b48294799a088bca68"
dependencies = [ dependencies = [
"Inflector", "Inflector",
"proc-macro-error", "proc-macro-error",
@ -2752,6 +2769,7 @@ dependencies = [
"binary-heap-plus", "binary-heap-plus",
"clap", "clap",
"compare", "compare",
"ctrlc",
"fnv", "fnv",
"itertools 0.10.1", "itertools 0.10.1",
"memchr 2.4.0", "memchr 2.4.0",

View file

@ -18,10 +18,11 @@ path = "src/sort.rs"
binary-heap-plus = "0.4.1" binary-heap-plus = "0.4.1"
clap = { version = "2.33", features = ["wrap_help"] } clap = { version = "2.33", features = ["wrap_help"] }
compare = "0.1.0" compare = "0.1.0"
ctrlc = { version = "3.0", features = ["termination"] }
fnv = "1.0.7" fnv = "1.0.7"
itertools = "0.10.0" itertools = "0.10.0"
memchr = "2.4.0" memchr = "2.4.0"
ouroboros = "0.9.3" ouroboros = "0.10.1"
rand = "0.7" rand = "0.7"
rayon = "1.5" rayon = "1.5"
tempfile = "3" tempfile = "3"

View file

@ -13,7 +13,6 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::io::Write; use std::io::Write;
use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::{ use std::{
io::Read, io::Read,
@ -29,14 +28,13 @@ use crate::merge::ClosedTmpFile;
use crate::merge::WriteableCompressedTmpFile; use crate::merge::WriteableCompressedTmpFile;
use crate::merge::WriteablePlainTmpFile; use crate::merge::WriteablePlainTmpFile;
use crate::merge::WriteableTmpFile; use crate::merge::WriteableTmpFile;
use crate::tmp_dir::TmpDirWrapper;
use crate::Output; use crate::Output;
use crate::SortError;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
compare_by, merge, sort_by, GlobalSettings, compare_by, merge, sort_by, GlobalSettings,
}; };
use crate::{print_sorted, Line}; use crate::{print_sorted, Line};
use tempfile::TempDir;
const START_BUFFER_SIZE: usize = 8_000; const START_BUFFER_SIZE: usize = 8_000;
@ -45,6 +43,7 @@ pub fn ext_sort(
files: &mut impl Iterator<Item = UResult<Box<dyn Read + Send>>>, files: &mut impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
settings: &GlobalSettings, settings: &GlobalSettings,
output: Output, output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> { ) -> UResult<()> {
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
@ -59,6 +58,7 @@ pub fn ext_sort(
sorted_receiver, sorted_receiver,
recycled_sender, recycled_sender,
output, output,
tmp_dir,
) )
} else { } else {
reader_writer::<_, WriteablePlainTmpFile>( reader_writer::<_, WriteablePlainTmpFile>(
@ -67,6 +67,7 @@ pub fn ext_sort(
sorted_receiver, sorted_receiver,
recycled_sender, recycled_sender,
output, output,
tmp_dir,
) )
} }
} }
@ -80,6 +81,7 @@ fn reader_writer<
receiver: Receiver<Chunk>, receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>, sender: SyncSender<Chunk>,
output: Output, output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> { ) -> UResult<()> {
let separator = if settings.zero_terminated { let separator = if settings.zero_terminated {
b'\0' b'\0'
@ -92,7 +94,7 @@ fn reader_writer<
let buffer_size = settings.buffer_size / 10; let buffer_size = settings.buffer_size / 10;
let read_result: ReadResult<Tmp> = read_write_loop( let read_result: ReadResult<Tmp> = read_write_loop(
files, files,
&settings.tmp_dir, tmp_dir,
separator, separator,
buffer_size, buffer_size,
settings, settings,
@ -100,12 +102,11 @@ fn reader_writer<
sender, sender,
)?; )?;
match read_result { match read_result {
ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => { ReadResult::WroteChunksToFile { tmp_files } => {
let tmp_dir_size = tmp_files.len();
let merger = merge::merge_with_file_limit::<_, _, Tmp>( let merger = merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()), tmp_files.into_iter().map(|c| c.reopen()),
settings, settings,
Some((tmp_dir, tmp_dir_size)), tmp_dir,
)?; )?;
merger.write_all(settings, output)?; merger.write_all(settings, output)?;
} }
@ -176,15 +177,12 @@ enum ReadResult<I: WriteableTmpFile> {
/// The input fits into two chunks, which were kept in memory. /// The input fits into two chunks, which were kept in memory.
SortedTwoChunks([Chunk; 2]), SortedTwoChunks([Chunk; 2]),
/// The input was read into multiple chunks, which were written to auxiliary files. /// The input was read into multiple chunks, which were written to auxiliary files.
WroteChunksToFile { WroteChunksToFile { tmp_files: Vec<I::Closed> },
tmp_files: Vec<I::Closed>,
tmp_dir: TempDir,
},
} }
/// The function that is executed on the reader/writer thread. /// The function that is executed on the reader/writer thread.
fn read_write_loop<I: WriteableTmpFile>( fn read_write_loop<I: WriteableTmpFile>(
mut files: impl Iterator<Item = UResult<Box<dyn Read + Send>>>, mut files: impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
tmp_dir_parent: &Path, tmp_dir: &mut TmpDirWrapper,
separator: u8, separator: u8,
buffer_size: usize, buffer_size: usize,
settings: &GlobalSettings, settings: &GlobalSettings,
@ -228,32 +226,24 @@ fn read_write_loop<I: WriteableTmpFile>(
} }
} }
let tmp_dir = tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(tmp_dir_parent)
.map_err(|_| SortError::TmpDirCreationFailed)?;
let mut sender_option = Some(sender); let mut sender_option = Some(sender);
let mut file_number = 0;
let mut tmp_files = vec![]; let mut tmp_files = vec![];
loop { loop {
let mut chunk = match receiver.recv() { let mut chunk = match receiver.recv() {
Ok(it) => it, Ok(it) => it,
_ => { _ => {
return Ok(ReadResult::WroteChunksToFile { tmp_files, tmp_dir }); return Ok(ReadResult::WroteChunksToFile { tmp_files });
} }
}; };
let tmp_file = write::<I>( let tmp_file = write::<I>(
&mut chunk, &mut chunk,
tmp_dir.path().join(file_number.to_string()), tmp_dir.next_file_path()?,
settings.compress_prog.as_deref(), settings.compress_prog.as_deref(),
separator, separator,
)?; )?;
tmp_files.push(tmp_file); tmp_files.push(tmp_file);
file_number += 1;
let recycled_chunk = chunk.recycle(); let recycled_chunk = chunk.recycle();
if let Some(sender) = &sender_option { if let Some(sender) = &sender_option {

View file

@ -22,45 +22,41 @@ use std::{
use compare::Compare; use compare::Compare;
use itertools::Itertools; use itertools::Itertools;
use tempfile::TempDir;
use uucore::error::UResult; use uucore::error::UResult;
use crate::{ use crate::{
chunks::{self, Chunk, RecycledChunk}, chunks::{self, Chunk, RecycledChunk},
compare_by, open, GlobalSettings, Output, SortError, compare_by, open,
tmp_dir::TmpDirWrapper,
GlobalSettings, Output, SortError,
}; };
/// If the output file occurs in the input files as well, copy the contents of the output file /// If the output file occurs in the input files as well, copy the contents of the output file
/// and replace its occurrences in the inputs with that copy. /// and replace its occurrences in the inputs with that copy.
fn replace_output_file_in_input_files( fn replace_output_file_in_input_files(
files: &mut [OsString], files: &mut [OsString],
settings: &GlobalSettings,
output: Option<&str>, output: Option<&str>,
) -> UResult<Option<(TempDir, usize)>> { tmp_dir: &mut TmpDirWrapper,
let mut copy: Option<(TempDir, PathBuf)> = None; ) -> UResult<()> {
let mut copy: Option<PathBuf> = None;
if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) { if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) {
for file in files { for file in files {
if let Ok(file_path) = Path::new(file).canonicalize() { if let Ok(file_path) = Path::new(file).canonicalize() {
if file_path == output_path { if file_path == output_path {
if let Some((_dir, copy)) = &copy { if let Some(copy) = &copy {
*file = copy.clone().into_os_string(); *file = copy.clone().into_os_string();
} else { } else {
let tmp_dir = tempfile::Builder::new() let copy_path = tmp_dir.next_file_path()?;
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.map_err(|_| SortError::TmpDirCreationFailed)?;
let copy_path = tmp_dir.path().join("0");
std::fs::copy(file_path, &copy_path) std::fs::copy(file_path, &copy_path)
.map_err(|error| SortError::OpenTmpFileFailed { error })?; .map_err(|error| SortError::OpenTmpFileFailed { error })?;
*file = copy_path.clone().into_os_string(); *file = copy_path.clone().into_os_string();
copy = Some((tmp_dir, copy_path)) copy = Some(copy_path)
} }
} }
} }
} }
} }
// if we created a TempDir its size must be one. Ok(())
Ok(copy.map(|(dir, _copy)| (dir, 1)))
} }
/// Merge pre-sorted `Box<dyn Read>`s. /// Merge pre-sorted `Box<dyn Read>`s.
@ -71,8 +67,9 @@ pub fn merge<'a>(
files: &mut [OsString], files: &mut [OsString],
settings: &'a GlobalSettings, settings: &'a GlobalSettings,
output: Option<&str>, output: Option<&str>,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> { ) -> UResult<FileMerger<'a>> {
let tmp_dir = replace_output_file_in_input_files(files, settings, output)?; replace_output_file_in_input_files(files, output, tmp_dir)?;
if settings.compress_prog.is_none() { if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>( merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
files files
@ -94,26 +91,16 @@ pub fn merge<'a>(
// Merge already sorted `MergeInput`s. // Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit< pub fn merge_with_file_limit<
'a,
M: MergeInput + 'static, M: MergeInput + 'static,
F: ExactSizeIterator<Item = UResult<M>>, F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static, Tmp: WriteableTmpFile + 'static,
>( >(
files: F, files: F,
settings: &GlobalSettings, settings: &'a GlobalSettings,
tmp_dir: Option<(TempDir, usize)>, tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger> { ) -> UResult<FileMerger<'a>> {
if files.len() > settings.merge_batch_size { if files.len() > settings.merge_batch_size {
// If we did not get a tmp_dir, create one.
let (tmp_dir, mut tmp_dir_size) = match tmp_dir {
Some(x) => x,
None => (
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.map_err(|_| SortError::TmpDirCreationFailed)?,
0,
),
};
let mut remaining_files = files.len(); let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size); let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter(); let mut batches = batches.into_iter();
@ -122,11 +109,8 @@ pub fn merge_with_file_limit<
// Work around the fact that `Chunks` is not an `ExactSizeIterator`. // Work around the fact that `Chunks` is not an `ExactSizeIterator`.
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let merger = merge_without_limit(batches.next().unwrap(), settings)?; let merger = merge_without_limit(batches.next().unwrap(), settings)?;
let mut tmp_file = Tmp::create( let mut tmp_file =
tmp_dir.path().join(tmp_dir_size.to_string()), Tmp::create(tmp_dir.next_file_path()?, settings.compress_prog.as_deref())?;
settings.compress_prog.as_deref(),
)?;
tmp_dir_size += 1;
merger.write_all_to(settings, tmp_file.as_write())?; merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?); temporary_files.push(tmp_file.finished_writing()?);
} }
@ -139,7 +123,7 @@ pub fn merge_with_file_limit<
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>, dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>), >),
settings, settings,
Some((tmp_dir, tmp_dir_size)), tmp_dir,
) )
} else { } else {
merge_without_limit(files, settings) merge_without_limit(files, settings)

View file

@ -22,6 +22,7 @@ mod custom_str_cmp;
mod ext_sort; mod ext_sort;
mod merge; mod merge;
mod numeric_str_cmp; mod numeric_str_cmp;
mod tmp_dir;
use chunks::LineData; use chunks::LineData;
use clap::{crate_version, App, Arg}; use clap::{crate_version, App, Arg};
@ -49,6 +50,8 @@ use uucore::parse_size::{parse_size, ParseSizeError};
use uucore::version_cmp::version_cmp; use uucore::version_cmp::version_cmp;
use uucore::InvalidEncodingHandling; use uucore::InvalidEncodingHandling;
use crate::tmp_dir::TmpDirWrapper;
const NAME: &str = "sort"; const NAME: &str = "sort";
const ABOUT: &str = "Display sorted concatenation of all FILE(s)."; const ABOUT: &str = "Display sorted concatenation of all FILE(s).";
@ -317,7 +320,6 @@ pub struct GlobalSettings {
threads: String, threads: String,
zero_terminated: bool, zero_terminated: bool,
buffer_size: usize, buffer_size: usize,
tmp_dir: PathBuf,
compress_prog: Option<String>, compress_prog: Option<String>,
merge_batch_size: usize, merge_batch_size: usize,
precomputed: Precomputed, precomputed: Precomputed,
@ -400,7 +402,6 @@ impl Default for GlobalSettings {
threads: String::new(), threads: String::new(),
zero_terminated: false, zero_terminated: false,
buffer_size: DEFAULT_BUF_SIZE, buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(),
compress_prog: None, compress_prog: None,
merge_batch_size: 32, merge_batch_size: 32,
precomputed: Precomputed { precomputed: Precomputed {
@ -1178,10 +1179,12 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
}) })
})?; })?;
settings.tmp_dir = matches let mut tmp_dir = TmpDirWrapper::new(
matches
.value_of(options::TMP_DIR) .value_of(options::TMP_DIR)
.map(PathBuf::from) .map(PathBuf::from)
.unwrap_or_else(env::temp_dir); .unwrap_or_else(env::temp_dir),
);
settings.compress_prog = matches.value_of(options::COMPRESS_PROG).map(String::from); settings.compress_prog = matches.value_of(options::COMPRESS_PROG).map(String::from);
@ -1280,7 +1283,7 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
settings.init_precomputed(); settings.init_precomputed();
exec(&mut files, &settings, output) exec(&mut files, &settings, output, &mut tmp_dir)
} }
pub fn uu_app() -> App<'static, 'static> { pub fn uu_app() -> App<'static, 'static> {
@ -1503,9 +1506,14 @@ pub fn uu_app() -> App<'static, 'static> {
) )
} }
fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UResult<()> { fn exec(
files: &mut [OsString],
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
if settings.merge { if settings.merge {
let file_merger = merge::merge(files, settings, output.as_output_name())?; let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?;
file_merger.write_all(settings, output) file_merger.write_all(settings, output)
} else if settings.check { } else if settings.check {
if files.len() > 1 { if files.len() > 1 {
@ -1515,7 +1523,7 @@ fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UR
} }
} else { } else {
let mut lines = files.iter().map(open); let mut lines = files.iter().map(open);
ext_sort(&mut lines, settings, output) ext_sort(&mut lines, settings, output, tmp_dir)
} }
} }

View file

@ -0,0 +1,80 @@
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use tempfile::TempDir;
use uucore::error::{UResult, USimpleError};
use crate::SortError;
/// A wrapper around TempDir that may only exist once in a process.
///
/// `TmpDirWrapper` handles the allocation of new temporary files in this temporary directory and
/// deleting the whole directory when `SIGINT` is received. Creating a second `TmpDirWrapper` will
/// fail because `ctrlc::set_handler()` fails when there's already a handler.
/// The directory is only created once the first file is requested.
pub struct TmpDirWrapper {
temp_dir: Option<TempDir>,
parent_path: PathBuf,
size: usize,
lock: Arc<Mutex<()>>,
}
impl TmpDirWrapper {
pub fn new(path: PathBuf) -> Self {
Self {
parent_path: path,
size: 0,
temp_dir: None,
lock: Default::default(),
}
}
fn init_tmp_dir(&mut self) -> UResult<()> {
assert!(self.temp_dir.is_none());
assert_eq!(self.size, 0);
self.temp_dir = Some(
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&self.parent_path)
.map_err(|_| SortError::TmpDirCreationFailed)?,
);
let path = self.temp_dir.as_ref().unwrap().path().to_owned();
let lock = self.lock.clone();
ctrlc::set_handler(move || {
// Take the lock so that `next_file_path` returns no new file path.
let _lock = lock.lock().unwrap();
if let Err(e) = remove_tmp_dir(&path) {
show_error!("failed to delete temporary directory: {}", e);
}
std::process::exit(2)
})
.map_err(|e| USimpleError::new(2, format!("failed to set up signal handler: {}", e)))
}
pub fn next_file_path(&mut self) -> UResult<PathBuf> {
if self.temp_dir.is_none() {
self.init_tmp_dir()?;
}
let _lock = self.lock.lock().unwrap();
let file_name = self.size.to_string();
self.size += 1;
Ok(self.temp_dir.as_ref().unwrap().path().join(file_name))
}
}
/// Remove the directory at `path` by deleting its child files and then itself.
/// Errors while deleting child files are ignored.
fn remove_tmp_dir(path: &Path) -> std::io::Result<()> {
if let Ok(read_dir) = std::fs::read_dir(&path) {
for file in read_dir.flatten() {
// if we fail to delete the file here it was probably deleted by another thread
// in the meantime, but that's ok.
let _ = std::fs::remove_file(file.path());
}
}
std::fs::remove_dir(path)
}

View file

@ -1091,3 +1091,30 @@ fn test_wrong_args_exit_code() {
.status_code(2) .status_code(2)
.stderr_contains("--misspelled"); .stderr_contains("--misspelled");
} }
#[test]
#[cfg(unix)]
fn test_tmp_files_deleted_on_sigint() {
use std::{fs::read_dir, time::Duration};
use nix::{sys::signal, unistd::Pid};
let (at, mut ucmd) = at_and_ucmd!();
at.mkdir("tmp_dir");
ucmd.args(&[
"ext_sort.txt",
"--buffer-size=1", // with a small buffer size `sort` will be forced to create a temporary directory very soon.
"--temporary-directory=tmp_dir",
]);
let mut child = ucmd.run_no_wait();
// wait a short amount of time so that `sort` can create a temporary directory.
std::thread::sleep(Duration::from_millis(100));
// `sort` should have created a temporary directory.
assert!(read_dir(at.plus("tmp_dir")).unwrap().next().is_some());
// kill sort with SIGINT
signal::kill(Pid::from_raw(child.id() as i32), signal::SIGINT).unwrap();
// wait for `sort` to exit
assert_eq!(child.wait().unwrap().code(), Some(2));
// `sort` should have deleted the temporary directory again.
assert!(read_dir(at.plus("tmp_dir")).unwrap().next().is_none());
}