From 6bdcad32da40e7fe2a1b159c72a455c1452a4baa Mon Sep 17 00:00:00 2001 From: Karl McDowall Date: Wed, 11 Dec 2024 18:27:05 -0700 Subject: [PATCH] sort: Rework merge batching logic Fix bug #6944 Rework the way batching is done with sort such that it doesn't open more input files than necessary. Previously, the code would always open one extra input file which causes problems in ulimit scenarios. Add additional test case. --- src/uu/sort/src/ext_sort.rs | 4 +- src/uu/sort/src/merge.rs | 77 +++++++++++++++++++------------------ src/uu/sort/src/sort.rs | 3 +- tests/by-util/test_sort.rs | 27 ++++++++++++- 4 files changed, 69 insertions(+), 42 deletions(-) diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 183098812..57e434e99 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -98,12 +98,12 @@ fn reader_writer< )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { - let merger = merge::merge_with_file_limit::<_, _, Tmp>( + merge::merge_with_file_limit::<_, _, Tmp>( tmp_files.into_iter().map(|c| c.reopen()), settings, + output, tmp_dir, )?; - merger.write_all(settings, output)?; } ReadResult::SortedSingleChunk(chunk) => { if settings.unique { diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index d6872ec80..300733d1e 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -25,7 +25,6 @@ use std::{ }; use compare::Compare; -use itertools::Itertools; use uucore::error::UResult; use crate::{ @@ -67,58 +66,63 @@ fn replace_output_file_in_input_files( /// /// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used. /// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it. -pub fn merge<'a>( +pub fn merge( files: &mut [OsString], - settings: &'a GlobalSettings, - output: Option<&str>, + settings: &GlobalSettings, + output: Output, tmp_dir: &mut TmpDirWrapper, -) -> UResult> { - replace_output_file_in_input_files(files, output, tmp_dir)?; +) -> UResult<()> { + replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?; + let files = files + .iter() + .map(|file| open(file).map(|file| PlainMergeInput { inner: file })); if settings.compress_prog.is_none() { - merge_with_file_limit::<_, _, WriteablePlainTmpFile>( - files - .iter() - .map(|file| open(file).map(|file| PlainMergeInput { inner: file })), - settings, - tmp_dir, - ) + merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir) } else { - merge_with_file_limit::<_, _, WriteableCompressedTmpFile>( - files - .iter() - .map(|file| open(file).map(|file| PlainMergeInput { inner: file })), - settings, - tmp_dir, - ) + merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir) } } // Merge already sorted `MergeInput`s. pub fn merge_with_file_limit< - 'a, M: MergeInput + 'static, F: ExactSizeIterator>, Tmp: WriteableTmpFile + 'static, >( files: F, - settings: &'a GlobalSettings, + settings: &GlobalSettings, + output: Output, tmp_dir: &mut TmpDirWrapper, -) -> UResult> { - if files.len() > settings.merge_batch_size { - let mut remaining_files = files.len(); - let batches = files.chunks(settings.merge_batch_size); - let mut batches = batches.into_iter(); +) -> UResult<()> { + if files.len() <= settings.merge_batch_size { + let merger = merge_without_limit(files, settings); + merger?.write_all(settings, output) + } else { let mut temporary_files = vec![]; - while remaining_files != 0 { - // Work around the fact that `Chunks` is not an `ExactSizeIterator`. - remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); - let merger = merge_without_limit(batches.next().unwrap(), settings)?; + let mut batch = vec![]; + for file in files { + batch.push(file); + if batch.len() >= settings.merge_batch_size { + assert_eq!(batch.len(), settings.merge_batch_size); + let merger = merge_without_limit(batch.into_iter(), settings)?; + batch = vec![]; + + let mut tmp_file = + Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + merger.write_all_to(settings, tmp_file.as_write())?; + temporary_files.push(tmp_file.finished_writing()?); + } + } + // Merge any remaining files that didn't get merged in a full batch above. + if !batch.is_empty() { + assert!(batch.len() < settings.merge_batch_size); + let merger = merge_without_limit(batch.into_iter(), settings)?; + let mut tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; merger.write_all_to(settings, tmp_file.as_write())?; temporary_files.push(tmp_file.finished_writing()?); } - assert!(batches.next().is_none()); merge_with_file_limit::<_, _, Tmp>( temporary_files .into_iter() @@ -127,10 +131,9 @@ pub fn merge_with_file_limit< dyn FnMut(Tmp::Closed) -> UResult<::Reopened>, >), settings, + output, tmp_dir, ) - } else { - merge_without_limit(files, settings) } } @@ -260,7 +263,7 @@ struct PreviousLine { } /// Merges files together. This is **not** an iterator because of lifetime problems. -pub struct FileMerger<'a> { +struct FileMerger<'a> { heap: binary_heap_plus::BinaryHeap>, request_sender: Sender<(usize, RecycledChunk)>, prev: Option, @@ -269,12 +272,12 @@ pub struct FileMerger<'a> { impl FileMerger<'_> { /// Write the merged contents to the output file. - pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> { + fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> { let mut out = output.into_write(); self.write_all_to(settings, &mut out) } - pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> { + fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> { while self.write_next(settings, out) {} drop(self.request_sender); self.reader_join_handle.join().unwrap() diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index c2e752bdf..8b6fcbb25 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -1567,8 +1567,7 @@ fn exec( tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { if settings.merge { - let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?; - file_merger.write_all(settings, output) + merge::merge(files, settings, output, tmp_dir) } else if settings.check { if files.len() > 1 { Err(UUsageError::new(2, "only one file allowed with -c")) diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index 97bfc6a74..62aa07dae 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -3,7 +3,7 @@ // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore (words) ints +// spell-checker:ignore (words) ints (linux) NOFILE #![allow(clippy::cast_possible_wrap)] use std::time::Duration; @@ -1084,6 +1084,31 @@ fn test_merge_batch_size() { .stdout_only_fixture("merge_ints_interleaved.expected"); } +#[test] +#[cfg(any(target_os = "linux", target_os = "android"))] +fn test_merge_batch_size_with_limit() { + use rlimit::Resource; + // Currently need... + // 3 descriptors for stdin, stdout, stderr + // 2 descriptors for CTRL+C handling logic (to be reworked at some point) + // 2 descriptors for the input files (i.e. batch-size of 2). + let limit_fd = 3 + 2 + 2; + TestScenario::new(util_name!()) + .ucmd() + .limit(Resource::NOFILE, limit_fd, limit_fd) + .arg("--batch-size=2") + .arg("-m") + .arg("--unique") + .arg("merge_ints_interleaved_1.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_1.txt") + .succeeds() + .stdout_only_fixture("merge_ints_interleaved.expected"); +} + #[test] fn test_sigpipe_panic() { let mut cmd = new_ucmd!();