1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-30 12:37:49 +00:00

sort: implement --batch-size

This commit is contained in:
Michael Debertol 2021-06-06 13:50:38 +02:00
parent 8d213219c7
commit 7c9da82b39
4 changed files with 88 additions and 7 deletions

View file

@ -89,7 +89,7 @@ pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings
Box::new(BufReader::new(file)) as Box<dyn Read + Send> Box::new(BufReader::new(file)) as Box<dyn Read + Send>
} }
}); });
let mut merger = merge::merge(files, settings); let mut merger = merge::merge_with_file_limit(files, settings);
for child in children { for child in children {
assert_child_success(child, settings.compress_prog.as_ref().unwrap()); assert_child_success(child, settings.compress_prog.as_ref().unwrap());
} }

View file

@ -9,7 +9,8 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
io::{Read, Write}, fs::File,
io::{BufWriter, Read, Write},
iter, iter,
rc::Rc, rc::Rc,
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
@ -17,6 +18,7 @@ use std::{
}; };
use compare::Compare; use compare::Compare;
use itertools::Itertools;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
@ -24,13 +26,60 @@ use crate::{
}; };
// Merge already sorted files. // Merge already sorted files.
pub fn merge<F: ExactSizeIterator<Item = Box<dyn Read + Send>>>( pub fn merge_with_file_limit<F: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
files: F,
settings: &GlobalSettings,
) -> FileMerger {
if files.len() > settings.merge_batch_size {
let tmp_dir = tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.unwrap();
let mut batch_number = 0;
let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter();
while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 {
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let mut merger = merge_without_limit(batches.next().unwrap(), settings);
let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap();
merger.write_all_to(settings, &mut BufWriter::new(tmp_file));
batch_number += 1;
}
let batch_files = (0..batch_number).map(|n| {
Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap())
as Box<dyn Read + Send>
});
if batch_number > settings.merge_batch_size {
assert!(batches.next().is_none());
merge_with_file_limit(
Box::new(batch_files) as Box<dyn ExactSizeIterator<Item = Box<dyn Read + Send>>>,
settings,
)
} else {
let final_batch = batches.next();
assert!(batches.next().is_none());
merge_without_limit(
batch_files.chain(final_batch.into_iter().flatten()),
settings,
)
}
} else {
merge_without_limit(files, settings)
}
}
/// Merge files without limiting how many files are concurrently open
///
/// It is the responsibility of the caller to ensure that `files` yields only
/// as many files as we are allowed to open concurrently.
fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
files: F, files: F,
settings: &GlobalSettings, settings: &GlobalSettings,
) -> FileMerger { ) -> FileMerger {
let (request_sender, request_receiver) = channel(); let (request_sender, request_receiver) = channel();
let mut reader_files = Vec::with_capacity(files.len()); let mut reader_files = Vec::with_capacity(files.size_hint().0);
let mut loaded_receivers = Vec::with_capacity(files.len()); let mut loaded_receivers = Vec::with_capacity(files.size_hint().0);
for (file_number, file) in files.enumerate() { for (file_number, file) in files.enumerate() {
let (sender, receiver) = sync_channel(2); let (sender, receiver) = sync_channel(2);
loaded_receivers.push(receiver); loaded_receivers.push(receiver);
@ -148,7 +197,11 @@ impl<'a> FileMerger<'a> {
/// Write the merged contents to the output file. /// Write the merged contents to the output file.
pub fn write_all(&mut self, settings: &GlobalSettings) { pub fn write_all(&mut self, settings: &GlobalSettings) {
let mut out = settings.out_writer(); let mut out = settings.out_writer();
while self.write_next(settings, &mut out) {} self.write_all_to(settings, &mut out);
}
pub fn write_all_to(&mut self, settings: &GlobalSettings, out: &mut impl Write) {
while self.write_next(settings, out) {}
} }
fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool { fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool {

View file

@ -96,6 +96,7 @@ static OPT_FILES0_FROM: &str = "files0-from";
static OPT_BUF_SIZE: &str = "buffer-size"; static OPT_BUF_SIZE: &str = "buffer-size";
static OPT_TMP_DIR: &str = "temporary-directory"; static OPT_TMP_DIR: &str = "temporary-directory";
static OPT_COMPRESS_PROG: &str = "compress-program"; static OPT_COMPRESS_PROG: &str = "compress-program";
static OPT_BATCH_SIZE: &str = "batch-size";
static ARG_FILES: &str = "files"; static ARG_FILES: &str = "files";
@ -157,6 +158,7 @@ pub struct GlobalSettings {
buffer_size: usize, buffer_size: usize,
tmp_dir: PathBuf, tmp_dir: PathBuf,
compress_prog: Option<String>, compress_prog: Option<String>,
merge_batch_size: usize,
} }
impl GlobalSettings { impl GlobalSettings {
@ -226,6 +228,7 @@ impl Default for GlobalSettings {
buffer_size: DEFAULT_BUF_SIZE, buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(), tmp_dir: PathBuf::new(),
compress_prog: None, compress_prog: None,
merge_batch_size: 16,
} }
} }
} }
@ -1086,6 +1089,12 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
.long_help("PROG has to take input from stdin and output to stdout") .long_help("PROG has to take input from stdin and output to stdout")
.value_name("PROG") .value_name("PROG")
) )
.arg(
Arg::with_name(OPT_BATCH_SIZE)
.long(OPT_BATCH_SIZE)
.help("Merge at most N_MERGE inputs at once.")
.value_name("N_MERGE")
)
.arg( .arg(
Arg::with_name(OPT_FILES0_FROM) Arg::with_name(OPT_FILES0_FROM)
.long(OPT_FILES0_FROM) .long(OPT_FILES0_FROM)
@ -1177,6 +1186,12 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from); settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from);
if let Some(n_merge) = matches.value_of(OPT_BATCH_SIZE) {
settings.merge_batch_size = n_merge
.parse()
.unwrap_or_else(|_| crash!(2, "invalid --batch-size argument '{}'", n_merge));
}
settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED);
settings.merge = matches.is_present(OPT_MERGE); settings.merge = matches.is_present(OPT_MERGE);
@ -1252,7 +1267,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &
fn exec(files: &[String], settings: &GlobalSettings) -> i32 { fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
if settings.merge { if settings.merge {
let mut file_merger = merge::merge(files.iter().map(open), settings); let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings);
file_merger.write_all(settings); file_merger.write_all(settings);
} else if settings.check { } else if settings.check {
if files.len() > 1 { if files.len() > 1 {

View file

@ -840,3 +840,16 @@ fn test_compress_fail() {
.fails() .fails()
.stderr_only("sort: couldn't execute compress program: errno 2"); .stderr_only("sort: couldn't execute compress program: errno 2");
} }
#[test]
fn test_merge_batches() {
new_ucmd!()
.args(&[
"ext_sort.txt",
"-n",
"-S",
"150B",
])
.succeeds()
.stdout_only_fixture("ext_sort.expected");
}