1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-29 03:57:44 +00:00

Merge pull request #6957 from karlmcdowall/sort_merge_chunking_rework

sort: Rework merge batching logic
This commit is contained in:
Sylvestre Ledru 2024-12-21 23:19:44 +01:00 committed by GitHub
commit 913d5d413b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 69 additions and 42 deletions

View file

@ -98,12 +98,12 @@ fn reader_writer<
)?; )?;
match read_result { match read_result {
ReadResult::WroteChunksToFile { tmp_files } => { ReadResult::WroteChunksToFile { tmp_files } => {
let merger = merge::merge_with_file_limit::<_, _, Tmp>( merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()), tmp_files.into_iter().map(|c| c.reopen()),
settings, settings,
output,
tmp_dir, tmp_dir,
)?; )?;
merger.write_all(settings, output)?;
} }
ReadResult::SortedSingleChunk(chunk) => { ReadResult::SortedSingleChunk(chunk) => {
if settings.unique { if settings.unique {

View file

@ -25,7 +25,6 @@ use std::{
}; };
use compare::Compare; use compare::Compare;
use itertools::Itertools;
use uucore::error::UResult; use uucore::error::UResult;
use crate::{ use crate::{
@ -67,58 +66,63 @@ fn replace_output_file_in_input_files(
/// ///
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used. /// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it. /// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
pub fn merge<'a>( pub fn merge(
files: &mut [OsString], files: &mut [OsString],
settings: &'a GlobalSettings, settings: &GlobalSettings,
output: Option<&str>, output: Output,
tmp_dir: &mut TmpDirWrapper, tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> { ) -> UResult<()> {
replace_output_file_in_input_files(files, output, tmp_dir)?; replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?;
let files = files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file }));
if settings.compress_prog.is_none() { if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>( merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir)
files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
} else { } else {
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>( merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir)
files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
} }
} }
// Merge already sorted `MergeInput`s. // Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit< pub fn merge_with_file_limit<
'a,
M: MergeInput + 'static, M: MergeInput + 'static,
F: ExactSizeIterator<Item = UResult<M>>, F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static, Tmp: WriteableTmpFile + 'static,
>( >(
files: F, files: F,
settings: &'a GlobalSettings, settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper, tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> { ) -> UResult<()> {
if files.len() > settings.merge_batch_size { if files.len() <= settings.merge_batch_size {
let mut remaining_files = files.len(); let merger = merge_without_limit(files, settings);
let batches = files.chunks(settings.merge_batch_size); merger?.write_all(settings, output)
let mut batches = batches.into_iter(); } else {
let mut temporary_files = vec![]; let mut temporary_files = vec![];
while remaining_files != 0 { let mut batch = vec![];
// Work around the fact that `Chunks` is not an `ExactSizeIterator`. for file in files {
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); batch.push(file);
let merger = merge_without_limit(batches.next().unwrap(), settings)?; if batch.len() >= settings.merge_batch_size {
assert_eq!(batch.len(), settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;
batch = vec![];
let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
}
// Merge any remaining files that didn't get merged in a full batch above.
if !batch.is_empty() {
assert!(batch.len() < settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;
let mut tmp_file = let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?; merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?); temporary_files.push(tmp_file.finished_writing()?);
} }
assert!(batches.next().is_none());
merge_with_file_limit::<_, _, Tmp>( merge_with_file_limit::<_, _, Tmp>(
temporary_files temporary_files
.into_iter() .into_iter()
@ -127,10 +131,9 @@ pub fn merge_with_file_limit<
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>, dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>), >),
settings, settings,
output,
tmp_dir, tmp_dir,
) )
} else {
merge_without_limit(files, settings)
} }
} }
@ -260,7 +263,7 @@ struct PreviousLine {
} }
/// Merges files together. This is **not** an iterator because of lifetime problems. /// Merges files together. This is **not** an iterator because of lifetime problems.
pub struct FileMerger<'a> { struct FileMerger<'a> {
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>, heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
request_sender: Sender<(usize, RecycledChunk)>, request_sender: Sender<(usize, RecycledChunk)>,
prev: Option<PreviousLine>, prev: Option<PreviousLine>,
@ -269,12 +272,12 @@ pub struct FileMerger<'a> {
impl FileMerger<'_> { impl FileMerger<'_> {
/// Write the merged contents to the output file. /// Write the merged contents to the output file.
pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> { fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
let mut out = output.into_write(); let mut out = output.into_write();
self.write_all_to(settings, &mut out) self.write_all_to(settings, &mut out)
} }
pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> { fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
while self.write_next(settings, out) {} while self.write_next(settings, out) {}
drop(self.request_sender); drop(self.request_sender);
self.reader_join_handle.join().unwrap() self.reader_join_handle.join().unwrap()

View file

@ -1567,8 +1567,7 @@ fn exec(
tmp_dir: &mut TmpDirWrapper, tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> { ) -> UResult<()> {
if settings.merge { if settings.merge {
let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?; merge::merge(files, settings, output, tmp_dir)
file_merger.write_all(settings, output)
} else if settings.check { } else if settings.check {
if files.len() > 1 { if files.len() > 1 {
Err(UUsageError::new(2, "only one file allowed with -c")) Err(UUsageError::new(2, "only one file allowed with -c"))

View file

@ -3,7 +3,7 @@
// For the full copyright and license information, please view the LICENSE // For the full copyright and license information, please view the LICENSE
// file that was distributed with this source code. // file that was distributed with this source code.
// spell-checker:ignore (words) ints // spell-checker:ignore (words) ints (linux) NOFILE
#![allow(clippy::cast_possible_wrap)] #![allow(clippy::cast_possible_wrap)]
use std::time::Duration; use std::time::Duration;
@ -1084,6 +1084,31 @@ fn test_merge_batch_size() {
.stdout_only_fixture("merge_ints_interleaved.expected"); .stdout_only_fixture("merge_ints_interleaved.expected");
} }
#[test]
#[cfg(any(target_os = "linux", target_os = "android"))]
fn test_merge_batch_size_with_limit() {
use rlimit::Resource;
// Currently need...
// 3 descriptors for stdin, stdout, stderr
// 2 descriptors for CTRL+C handling logic (to be reworked at some point)
// 2 descriptors for the input files (i.e. batch-size of 2).
let limit_fd = 3 + 2 + 2;
TestScenario::new(util_name!())
.ucmd()
.limit(Resource::NOFILE, limit_fd, limit_fd)
.arg("--batch-size=2")
.arg("-m")
.arg("--unique")
.arg("merge_ints_interleaved_1.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_1.txt")
.succeeds()
.stdout_only_fixture("merge_ints_interleaved.expected");
}
#[test] #[test]
fn test_sigpipe_panic() { fn test_sigpipe_panic() {
let mut cmd = new_ucmd!(); let mut cmd = new_ucmd!();