diff --git a/Cargo.lock b/Cargo.lock index 7257cec80..b2abe2e48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ dependencies = [ "memchr 2.4.0", ] +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "ansi_term" version = "0.11.0" @@ -589,6 +595,16 @@ dependencies = [ "syn", ] +[[package]] +name = "ctrlc" +version = "3.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "232295399409a8b7ae41276757b5a1cc21032848d42bff2352261f958b3ca29a" +dependencies = [ + "nix 0.20.0", + "winapi 0.3.9", +] + [[package]] name = "custom_derive" version = "0.1.7" @@ -1171,19 +1187,20 @@ dependencies = [ [[package]] name = "ouroboros" -version = "0.9.5" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeff60e3e37407a80ead3e9458145b456e978c4068cddbfea6afb48572962ca" +checksum = "84236d64f1718c387232287cf036eb6632a5ecff226f4ff9dccb8c2b79ba0bde" dependencies = [ + "aliasable", "ouroboros_macro", "stable_deref_trait", ] [[package]] name = "ouroboros_macro" -version = "0.9.5" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03f2cb802b5bdfdf52f1ffa0b54ce105e4d346e91990dd571f86c91321ad49e2" +checksum = "f463857a6eb96c0136b1d56e56c718350cef30412ec065b48294799a088bca68" dependencies = [ "Inflector", "proc-macro-error", @@ -2752,6 +2769,7 @@ dependencies = [ "binary-heap-plus", "clap", "compare", + "ctrlc", "fnv", "itertools 0.10.1", "memchr 2.4.0", diff --git a/src/uu/sort/Cargo.toml b/src/uu/sort/Cargo.toml index 14db546eb..7faf9906e 100644 --- a/src/uu/sort/Cargo.toml +++ b/src/uu/sort/Cargo.toml @@ -18,10 +18,11 @@ path = "src/sort.rs" binary-heap-plus = "0.4.1" clap = { version = "2.33", features = ["wrap_help"] } compare = "0.1.0" +ctrlc = { version = "3.0", features = ["termination"] } fnv = "1.0.7" itertools = "0.10.0" memchr = "2.4.0" -ouroboros = "0.9.3" +ouroboros = "0.10.1" rand = "0.7" rayon = "1.5" tempfile = "3" diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 8ff5665fd..b4827f962 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -13,7 +13,6 @@ use std::cmp::Ordering; use std::io::Write; -use std::path::Path; use std::path::PathBuf; use std::{ io::Read, @@ -29,14 +28,13 @@ use crate::merge::ClosedTmpFile; use crate::merge::WriteableCompressedTmpFile; use crate::merge::WriteablePlainTmpFile; use crate::merge::WriteableTmpFile; +use crate::tmp_dir::TmpDirWrapper; use crate::Output; -use crate::SortError; use crate::{ chunks::{self, Chunk}, compare_by, merge, sort_by, GlobalSettings, }; use crate::{print_sorted, Line}; -use tempfile::TempDir; const START_BUFFER_SIZE: usize = 8_000; @@ -45,6 +43,7 @@ pub fn ext_sort( files: &mut impl Iterator>>, settings: &GlobalSettings, output: Output, + tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); @@ -59,6 +58,7 @@ pub fn ext_sort( sorted_receiver, recycled_sender, output, + tmp_dir, ) } else { reader_writer::<_, WriteablePlainTmpFile>( @@ -67,6 +67,7 @@ pub fn ext_sort( sorted_receiver, recycled_sender, output, + tmp_dir, ) } } @@ -80,6 +81,7 @@ fn reader_writer< receiver: Receiver, sender: SyncSender, output: Output, + tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { let separator = if settings.zero_terminated { b'\0' @@ -92,7 +94,7 @@ fn reader_writer< let buffer_size = settings.buffer_size / 10; let read_result: ReadResult = read_write_loop( files, - &settings.tmp_dir, + tmp_dir, separator, buffer_size, settings, @@ -100,12 +102,11 @@ fn reader_writer< sender, )?; match read_result { - ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => { - let tmp_dir_size = tmp_files.len(); + ReadResult::WroteChunksToFile { tmp_files } => { let merger = merge::merge_with_file_limit::<_, _, Tmp>( tmp_files.into_iter().map(|c| c.reopen()), settings, - Some((tmp_dir, tmp_dir_size)), + tmp_dir, )?; merger.write_all(settings, output)?; } @@ -176,15 +177,12 @@ enum ReadResult { /// The input fits into two chunks, which were kept in memory. SortedTwoChunks([Chunk; 2]), /// The input was read into multiple chunks, which were written to auxiliary files. - WroteChunksToFile { - tmp_files: Vec, - tmp_dir: TempDir, - }, + WroteChunksToFile { tmp_files: Vec }, } /// The function that is executed on the reader/writer thread. fn read_write_loop( mut files: impl Iterator>>, - tmp_dir_parent: &Path, + tmp_dir: &mut TmpDirWrapper, separator: u8, buffer_size: usize, settings: &GlobalSettings, @@ -228,32 +226,24 @@ fn read_write_loop( } } - let tmp_dir = tempfile::Builder::new() - .prefix("uutils_sort") - .tempdir_in(tmp_dir_parent) - .map_err(|_| SortError::TmpDirCreationFailed)?; - let mut sender_option = Some(sender); - let mut file_number = 0; let mut tmp_files = vec![]; loop { let mut chunk = match receiver.recv() { Ok(it) => it, _ => { - return Ok(ReadResult::WroteChunksToFile { tmp_files, tmp_dir }); + return Ok(ReadResult::WroteChunksToFile { tmp_files }); } }; let tmp_file = write::( &mut chunk, - tmp_dir.path().join(file_number.to_string()), + tmp_dir.next_file_path()?, settings.compress_prog.as_deref(), separator, )?; tmp_files.push(tmp_file); - file_number += 1; - let recycled_chunk = chunk.recycle(); if let Some(sender) = &sender_option { diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index fad966f64..64a7632bf 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -22,45 +22,41 @@ use std::{ use compare::Compare; use itertools::Itertools; -use tempfile::TempDir; use uucore::error::UResult; use crate::{ chunks::{self, Chunk, RecycledChunk}, - compare_by, open, GlobalSettings, Output, SortError, + compare_by, open, + tmp_dir::TmpDirWrapper, + GlobalSettings, Output, SortError, }; /// If the output file occurs in the input files as well, copy the contents of the output file /// and replace its occurrences in the inputs with that copy. fn replace_output_file_in_input_files( files: &mut [OsString], - settings: &GlobalSettings, output: Option<&str>, -) -> UResult> { - let mut copy: Option<(TempDir, PathBuf)> = None; + tmp_dir: &mut TmpDirWrapper, +) -> UResult<()> { + let mut copy: Option = None; if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) { for file in files { if let Ok(file_path) = Path::new(file).canonicalize() { if file_path == output_path { - if let Some((_dir, copy)) = © { + if let Some(copy) = © { *file = copy.clone().into_os_string(); } else { - let tmp_dir = tempfile::Builder::new() - .prefix("uutils_sort") - .tempdir_in(&settings.tmp_dir) - .map_err(|_| SortError::TmpDirCreationFailed)?; - let copy_path = tmp_dir.path().join("0"); + let copy_path = tmp_dir.next_file_path()?; std::fs::copy(file_path, ©_path) .map_err(|error| SortError::OpenTmpFileFailed { error })?; *file = copy_path.clone().into_os_string(); - copy = Some((tmp_dir, copy_path)) + copy = Some(copy_path) } } } } } - // if we created a TempDir its size must be one. - Ok(copy.map(|(dir, _copy)| (dir, 1))) + Ok(()) } /// Merge pre-sorted `Box`s. @@ -71,8 +67,9 @@ pub fn merge<'a>( files: &mut [OsString], settings: &'a GlobalSettings, output: Option<&str>, + tmp_dir: &mut TmpDirWrapper, ) -> UResult> { - let tmp_dir = replace_output_file_in_input_files(files, settings, output)?; + replace_output_file_in_input_files(files, output, tmp_dir)?; if settings.compress_prog.is_none() { merge_with_file_limit::<_, _, WriteablePlainTmpFile>( files @@ -94,26 +91,16 @@ pub fn merge<'a>( // Merge already sorted `MergeInput`s. pub fn merge_with_file_limit< + 'a, M: MergeInput + 'static, F: ExactSizeIterator>, Tmp: WriteableTmpFile + 'static, >( files: F, - settings: &GlobalSettings, - tmp_dir: Option<(TempDir, usize)>, -) -> UResult { + settings: &'a GlobalSettings, + tmp_dir: &mut TmpDirWrapper, +) -> UResult> { if files.len() > settings.merge_batch_size { - // If we did not get a tmp_dir, create one. - let (tmp_dir, mut tmp_dir_size) = match tmp_dir { - Some(x) => x, - None => ( - tempfile::Builder::new() - .prefix("uutils_sort") - .tempdir_in(&settings.tmp_dir) - .map_err(|_| SortError::TmpDirCreationFailed)?, - 0, - ), - }; let mut remaining_files = files.len(); let batches = files.chunks(settings.merge_batch_size); let mut batches = batches.into_iter(); @@ -122,11 +109,8 @@ pub fn merge_with_file_limit< // Work around the fact that `Chunks` is not an `ExactSizeIterator`. remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); let merger = merge_without_limit(batches.next().unwrap(), settings)?; - let mut tmp_file = Tmp::create( - tmp_dir.path().join(tmp_dir_size.to_string()), - settings.compress_prog.as_deref(), - )?; - tmp_dir_size += 1; + let mut tmp_file = + Tmp::create(tmp_dir.next_file_path()?, settings.compress_prog.as_deref())?; merger.write_all_to(settings, tmp_file.as_write())?; temporary_files.push(tmp_file.finished_writing()?); } @@ -139,7 +123,7 @@ pub fn merge_with_file_limit< dyn FnMut(Tmp::Closed) -> UResult<::Reopened>, >), settings, - Some((tmp_dir, tmp_dir_size)), + tmp_dir, ) } else { merge_without_limit(files, settings) diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index be9510aef..ae869ba49 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -22,6 +22,7 @@ mod custom_str_cmp; mod ext_sort; mod merge; mod numeric_str_cmp; +mod tmp_dir; use chunks::LineData; use clap::{crate_version, App, Arg}; @@ -49,6 +50,8 @@ use uucore::parse_size::{parse_size, ParseSizeError}; use uucore::version_cmp::version_cmp; use uucore::InvalidEncodingHandling; +use crate::tmp_dir::TmpDirWrapper; + const NAME: &str = "sort"; const ABOUT: &str = "Display sorted concatenation of all FILE(s)."; @@ -317,7 +320,6 @@ pub struct GlobalSettings { threads: String, zero_terminated: bool, buffer_size: usize, - tmp_dir: PathBuf, compress_prog: Option, merge_batch_size: usize, precomputed: Precomputed, @@ -400,7 +402,6 @@ impl Default for GlobalSettings { threads: String::new(), zero_terminated: false, buffer_size: DEFAULT_BUF_SIZE, - tmp_dir: PathBuf::new(), compress_prog: None, merge_batch_size: 32, precomputed: Precomputed { @@ -1178,10 +1179,12 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { }) })?; - settings.tmp_dir = matches - .value_of(options::TMP_DIR) - .map(PathBuf::from) - .unwrap_or_else(env::temp_dir); + let mut tmp_dir = TmpDirWrapper::new( + matches + .value_of(options::TMP_DIR) + .map(PathBuf::from) + .unwrap_or_else(env::temp_dir), + ); settings.compress_prog = matches.value_of(options::COMPRESS_PROG).map(String::from); @@ -1280,7 +1283,7 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { settings.init_precomputed(); - exec(&mut files, &settings, output) + exec(&mut files, &settings, output, &mut tmp_dir) } pub fn uu_app() -> App<'static, 'static> { @@ -1503,9 +1506,14 @@ pub fn uu_app() -> App<'static, 'static> { ) } -fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UResult<()> { +fn exec( + files: &mut [OsString], + settings: &GlobalSettings, + output: Output, + tmp_dir: &mut TmpDirWrapper, +) -> UResult<()> { if settings.merge { - let file_merger = merge::merge(files, settings, output.as_output_name())?; + let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?; file_merger.write_all(settings, output) } else if settings.check { if files.len() > 1 { @@ -1515,7 +1523,7 @@ fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UR } } else { let mut lines = files.iter().map(open); - ext_sort(&mut lines, settings, output) + ext_sort(&mut lines, settings, output, tmp_dir) } } diff --git a/src/uu/sort/src/tmp_dir.rs b/src/uu/sort/src/tmp_dir.rs new file mode 100644 index 000000000..884f2cd00 --- /dev/null +++ b/src/uu/sort/src/tmp_dir.rs @@ -0,0 +1,80 @@ +use std::{ + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +use tempfile::TempDir; +use uucore::error::{UResult, USimpleError}; + +use crate::SortError; + +/// A wrapper around TempDir that may only exist once in a process. +/// +/// `TmpDirWrapper` handles the allocation of new temporary files in this temporary directory and +/// deleting the whole directory when `SIGINT` is received. Creating a second `TmpDirWrapper` will +/// fail because `ctrlc::set_handler()` fails when there's already a handler. +/// The directory is only created once the first file is requested. +pub struct TmpDirWrapper { + temp_dir: Option, + parent_path: PathBuf, + size: usize, + lock: Arc>, +} + +impl TmpDirWrapper { + pub fn new(path: PathBuf) -> Self { + Self { + parent_path: path, + size: 0, + temp_dir: None, + lock: Default::default(), + } + } + + fn init_tmp_dir(&mut self) -> UResult<()> { + assert!(self.temp_dir.is_none()); + assert_eq!(self.size, 0); + self.temp_dir = Some( + tempfile::Builder::new() + .prefix("uutils_sort") + .tempdir_in(&self.parent_path) + .map_err(|_| SortError::TmpDirCreationFailed)?, + ); + + let path = self.temp_dir.as_ref().unwrap().path().to_owned(); + let lock = self.lock.clone(); + ctrlc::set_handler(move || { + // Take the lock so that `next_file_path` returns no new file path. + let _lock = lock.lock().unwrap(); + if let Err(e) = remove_tmp_dir(&path) { + show_error!("failed to delete temporary directory: {}", e); + } + std::process::exit(2) + }) + .map_err(|e| USimpleError::new(2, format!("failed to set up signal handler: {}", e))) + } + + pub fn next_file_path(&mut self) -> UResult { + if self.temp_dir.is_none() { + self.init_tmp_dir()?; + } + + let _lock = self.lock.lock().unwrap(); + let file_name = self.size.to_string(); + self.size += 1; + Ok(self.temp_dir.as_ref().unwrap().path().join(file_name)) + } +} + +/// Remove the directory at `path` by deleting its child files and then itself. +/// Errors while deleting child files are ignored. +fn remove_tmp_dir(path: &Path) -> std::io::Result<()> { + if let Ok(read_dir) = std::fs::read_dir(&path) { + for file in read_dir.flatten() { + // if we fail to delete the file here it was probably deleted by another thread + // in the meantime, but that's ok. + let _ = std::fs::remove_file(file.path()); + } + } + std::fs::remove_dir(path) +} diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index 1f21722ed..cfe96d3c5 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -1091,3 +1091,30 @@ fn test_wrong_args_exit_code() { .status_code(2) .stderr_contains("--misspelled"); } + +#[test] +#[cfg(unix)] +fn test_tmp_files_deleted_on_sigint() { + use std::{fs::read_dir, time::Duration}; + + use nix::{sys::signal, unistd::Pid}; + + let (at, mut ucmd) = at_and_ucmd!(); + at.mkdir("tmp_dir"); + ucmd.args(&[ + "ext_sort.txt", + "--buffer-size=1", // with a small buffer size `sort` will be forced to create a temporary directory very soon. + "--temporary-directory=tmp_dir", + ]); + let mut child = ucmd.run_no_wait(); + // wait a short amount of time so that `sort` can create a temporary directory. + std::thread::sleep(Duration::from_millis(100)); + // `sort` should have created a temporary directory. + assert!(read_dir(at.plus("tmp_dir")).unwrap().next().is_some()); + // kill sort with SIGINT + signal::kill(Pid::from_raw(child.id() as i32), signal::SIGINT).unwrap(); + // wait for `sort` to exit + assert_eq!(child.wait().unwrap().code(), Some(2)); + // `sort` should have deleted the temporary directory again. + assert!(read_dir(at.plus("tmp_dir")).unwrap().next().is_none()); +}