mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-29 03:57:44 +00:00
sort: Rework merge batching logic
Fix bug #6944 Rework the way batching is done with sort such that it doesn't open more input files than necessary. Previously, the code would always open one extra input file which causes problems in ulimit scenarios. Add additional test case.
This commit is contained in:
parent
5007bf2598
commit
6bdcad32da
4 changed files with 69 additions and 42 deletions
|
@ -98,12 +98,12 @@ fn reader_writer<
|
||||||
)?;
|
)?;
|
||||||
match read_result {
|
match read_result {
|
||||||
ReadResult::WroteChunksToFile { tmp_files } => {
|
ReadResult::WroteChunksToFile { tmp_files } => {
|
||||||
let merger = merge::merge_with_file_limit::<_, _, Tmp>(
|
merge::merge_with_file_limit::<_, _, Tmp>(
|
||||||
tmp_files.into_iter().map(|c| c.reopen()),
|
tmp_files.into_iter().map(|c| c.reopen()),
|
||||||
settings,
|
settings,
|
||||||
|
output,
|
||||||
tmp_dir,
|
tmp_dir,
|
||||||
)?;
|
)?;
|
||||||
merger.write_all(settings, output)?;
|
|
||||||
}
|
}
|
||||||
ReadResult::SortedSingleChunk(chunk) => {
|
ReadResult::SortedSingleChunk(chunk) => {
|
||||||
if settings.unique {
|
if settings.unique {
|
||||||
|
|
|
@ -25,7 +25,6 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use compare::Compare;
|
use compare::Compare;
|
||||||
use itertools::Itertools;
|
|
||||||
use uucore::error::UResult;
|
use uucore::error::UResult;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -67,58 +66,63 @@ fn replace_output_file_in_input_files(
|
||||||
///
|
///
|
||||||
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
|
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
|
||||||
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
|
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
|
||||||
pub fn merge<'a>(
|
pub fn merge(
|
||||||
files: &mut [OsString],
|
files: &mut [OsString],
|
||||||
settings: &'a GlobalSettings,
|
settings: &GlobalSettings,
|
||||||
output: Option<&str>,
|
output: Output,
|
||||||
tmp_dir: &mut TmpDirWrapper,
|
tmp_dir: &mut TmpDirWrapper,
|
||||||
) -> UResult<FileMerger<'a>> {
|
) -> UResult<()> {
|
||||||
replace_output_file_in_input_files(files, output, tmp_dir)?;
|
replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?;
|
||||||
|
let files = files
|
||||||
|
.iter()
|
||||||
|
.map(|file| open(file).map(|file| PlainMergeInput { inner: file }));
|
||||||
if settings.compress_prog.is_none() {
|
if settings.compress_prog.is_none() {
|
||||||
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
|
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir)
|
||||||
files
|
|
||||||
.iter()
|
|
||||||
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
|
|
||||||
settings,
|
|
||||||
tmp_dir,
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
|
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir)
|
||||||
files
|
|
||||||
.iter()
|
|
||||||
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
|
|
||||||
settings,
|
|
||||||
tmp_dir,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge already sorted `MergeInput`s.
|
// Merge already sorted `MergeInput`s.
|
||||||
pub fn merge_with_file_limit<
|
pub fn merge_with_file_limit<
|
||||||
'a,
|
|
||||||
M: MergeInput + 'static,
|
M: MergeInput + 'static,
|
||||||
F: ExactSizeIterator<Item = UResult<M>>,
|
F: ExactSizeIterator<Item = UResult<M>>,
|
||||||
Tmp: WriteableTmpFile + 'static,
|
Tmp: WriteableTmpFile + 'static,
|
||||||
>(
|
>(
|
||||||
files: F,
|
files: F,
|
||||||
settings: &'a GlobalSettings,
|
settings: &GlobalSettings,
|
||||||
|
output: Output,
|
||||||
tmp_dir: &mut TmpDirWrapper,
|
tmp_dir: &mut TmpDirWrapper,
|
||||||
) -> UResult<FileMerger<'a>> {
|
) -> UResult<()> {
|
||||||
if files.len() > settings.merge_batch_size {
|
if files.len() <= settings.merge_batch_size {
|
||||||
let mut remaining_files = files.len();
|
let merger = merge_without_limit(files, settings);
|
||||||
let batches = files.chunks(settings.merge_batch_size);
|
merger?.write_all(settings, output)
|
||||||
let mut batches = batches.into_iter();
|
} else {
|
||||||
let mut temporary_files = vec![];
|
let mut temporary_files = vec![];
|
||||||
while remaining_files != 0 {
|
let mut batch = vec![];
|
||||||
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
|
for file in files {
|
||||||
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
|
batch.push(file);
|
||||||
let merger = merge_without_limit(batches.next().unwrap(), settings)?;
|
if batch.len() >= settings.merge_batch_size {
|
||||||
|
assert_eq!(batch.len(), settings.merge_batch_size);
|
||||||
|
let merger = merge_without_limit(batch.into_iter(), settings)?;
|
||||||
|
batch = vec![];
|
||||||
|
|
||||||
|
let mut tmp_file =
|
||||||
|
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
|
||||||
|
merger.write_all_to(settings, tmp_file.as_write())?;
|
||||||
|
temporary_files.push(tmp_file.finished_writing()?);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Merge any remaining files that didn't get merged in a full batch above.
|
||||||
|
if !batch.is_empty() {
|
||||||
|
assert!(batch.len() < settings.merge_batch_size);
|
||||||
|
let merger = merge_without_limit(batch.into_iter(), settings)?;
|
||||||
|
|
||||||
let mut tmp_file =
|
let mut tmp_file =
|
||||||
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
|
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
|
||||||
merger.write_all_to(settings, tmp_file.as_write())?;
|
merger.write_all_to(settings, tmp_file.as_write())?;
|
||||||
temporary_files.push(tmp_file.finished_writing()?);
|
temporary_files.push(tmp_file.finished_writing()?);
|
||||||
}
|
}
|
||||||
assert!(batches.next().is_none());
|
|
||||||
merge_with_file_limit::<_, _, Tmp>(
|
merge_with_file_limit::<_, _, Tmp>(
|
||||||
temporary_files
|
temporary_files
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -127,10 +131,9 @@ pub fn merge_with_file_limit<
|
||||||
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
|
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
|
||||||
>),
|
>),
|
||||||
settings,
|
settings,
|
||||||
|
output,
|
||||||
tmp_dir,
|
tmp_dir,
|
||||||
)
|
)
|
||||||
} else {
|
|
||||||
merge_without_limit(files, settings)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,7 +263,7 @@ struct PreviousLine {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Merges files together. This is **not** an iterator because of lifetime problems.
|
/// Merges files together. This is **not** an iterator because of lifetime problems.
|
||||||
pub struct FileMerger<'a> {
|
struct FileMerger<'a> {
|
||||||
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
|
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
|
||||||
request_sender: Sender<(usize, RecycledChunk)>,
|
request_sender: Sender<(usize, RecycledChunk)>,
|
||||||
prev: Option<PreviousLine>,
|
prev: Option<PreviousLine>,
|
||||||
|
@ -269,12 +272,12 @@ pub struct FileMerger<'a> {
|
||||||
|
|
||||||
impl FileMerger<'_> {
|
impl FileMerger<'_> {
|
||||||
/// Write the merged contents to the output file.
|
/// Write the merged contents to the output file.
|
||||||
pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
|
fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
|
||||||
let mut out = output.into_write();
|
let mut out = output.into_write();
|
||||||
self.write_all_to(settings, &mut out)
|
self.write_all_to(settings, &mut out)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
|
fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
|
||||||
while self.write_next(settings, out) {}
|
while self.write_next(settings, out) {}
|
||||||
drop(self.request_sender);
|
drop(self.request_sender);
|
||||||
self.reader_join_handle.join().unwrap()
|
self.reader_join_handle.join().unwrap()
|
||||||
|
|
|
@ -1567,8 +1567,7 @@ fn exec(
|
||||||
tmp_dir: &mut TmpDirWrapper,
|
tmp_dir: &mut TmpDirWrapper,
|
||||||
) -> UResult<()> {
|
) -> UResult<()> {
|
||||||
if settings.merge {
|
if settings.merge {
|
||||||
let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?;
|
merge::merge(files, settings, output, tmp_dir)
|
||||||
file_merger.write_all(settings, output)
|
|
||||||
} else if settings.check {
|
} else if settings.check {
|
||||||
if files.len() > 1 {
|
if files.len() > 1 {
|
||||||
Err(UUsageError::new(2, "only one file allowed with -c"))
|
Err(UUsageError::new(2, "only one file allowed with -c"))
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
// For the full copyright and license information, please view the LICENSE
|
// For the full copyright and license information, please view the LICENSE
|
||||||
// file that was distributed with this source code.
|
// file that was distributed with this source code.
|
||||||
|
|
||||||
// spell-checker:ignore (words) ints
|
// spell-checker:ignore (words) ints (linux) NOFILE
|
||||||
#![allow(clippy::cast_possible_wrap)]
|
#![allow(clippy::cast_possible_wrap)]
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -1084,6 +1084,31 @@ fn test_merge_batch_size() {
|
||||||
.stdout_only_fixture("merge_ints_interleaved.expected");
|
.stdout_only_fixture("merge_ints_interleaved.expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||||
|
fn test_merge_batch_size_with_limit() {
|
||||||
|
use rlimit::Resource;
|
||||||
|
// Currently need...
|
||||||
|
// 3 descriptors for stdin, stdout, stderr
|
||||||
|
// 2 descriptors for CTRL+C handling logic (to be reworked at some point)
|
||||||
|
// 2 descriptors for the input files (i.e. batch-size of 2).
|
||||||
|
let limit_fd = 3 + 2 + 2;
|
||||||
|
TestScenario::new(util_name!())
|
||||||
|
.ucmd()
|
||||||
|
.limit(Resource::NOFILE, limit_fd, limit_fd)
|
||||||
|
.arg("--batch-size=2")
|
||||||
|
.arg("-m")
|
||||||
|
.arg("--unique")
|
||||||
|
.arg("merge_ints_interleaved_1.txt")
|
||||||
|
.arg("merge_ints_interleaved_2.txt")
|
||||||
|
.arg("merge_ints_interleaved_3.txt")
|
||||||
|
.arg("merge_ints_interleaved_3.txt")
|
||||||
|
.arg("merge_ints_interleaved_2.txt")
|
||||||
|
.arg("merge_ints_interleaved_1.txt")
|
||||||
|
.succeeds()
|
||||||
|
.stdout_only_fixture("merge_ints_interleaved.expected");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sigpipe_panic() {
|
fn test_sigpipe_panic() {
|
||||||
let mut cmd = new_ucmd!();
|
let mut cmd = new_ucmd!();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue