From 7c9da82b394e0be8b640afb631f7d512a5351aab Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sun, 6 Jun 2021 13:50:38 +0200 Subject: [PATCH] sort: implement --batch-size --- src/uu/sort/src/ext_sort.rs | 2 +- src/uu/sort/src/merge.rs | 63 ++++++++++++++++++++++++++++++++++--- src/uu/sort/src/sort.rs | 17 +++++++++- tests/by-util/test_sort.rs | 13 ++++++++ 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 2d8513e9f..91a7ca360 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -89,7 +89,7 @@ pub fn ext_sort(files: &mut impl Iterator>, settings Box::new(BufReader::new(file)) as Box } }); - let mut merger = merge::merge(files, settings); + let mut merger = merge::merge_with_file_limit(files, settings); for child in children { assert_child_success(child, settings.compress_prog.as_ref().unwrap()); } diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index b47c58c08..478b454b6 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -9,7 +9,8 @@ use std::{ cmp::Ordering, - io::{Read, Write}, + fs::File, + io::{BufWriter, Read, Write}, iter, rc::Rc, sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, @@ -17,6 +18,7 @@ use std::{ }; use compare::Compare; +use itertools::Itertools; use crate::{ chunks::{self, Chunk}, @@ -24,13 +26,60 @@ use crate::{ }; // Merge already sorted files. -pub fn merge>>( +pub fn merge_with_file_limit>>( + files: F, + settings: &GlobalSettings, +) -> FileMerger { + if files.len() > settings.merge_batch_size { + let tmp_dir = tempfile::Builder::new() + .prefix("uutils_sort") + .tempdir_in(&settings.tmp_dir) + .unwrap(); + let mut batch_number = 0; + let mut remaining_files = files.len(); + let batches = files.chunks(settings.merge_batch_size); + let mut batches = batches.into_iter(); + while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 { + remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); + let mut merger = merge_without_limit(batches.next().unwrap(), settings); + let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap(); + merger.write_all_to(settings, &mut BufWriter::new(tmp_file)); + batch_number += 1; + } + let batch_files = (0..batch_number).map(|n| { + Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap()) + as Box + }); + if batch_number > settings.merge_batch_size { + assert!(batches.next().is_none()); + merge_with_file_limit( + Box::new(batch_files) as Box>>, + settings, + ) + } else { + let final_batch = batches.next(); + assert!(batches.next().is_none()); + merge_without_limit( + batch_files.chain(final_batch.into_iter().flatten()), + settings, + ) + } + } else { + merge_without_limit(files, settings) + } +} + +/// Merge files without limiting how many files are concurrently open +/// +/// It is the responsibility of the caller to ensure that `files` yields only +/// as many files as we are allowed to open concurrently. +fn merge_without_limit>>( files: F, settings: &GlobalSettings, ) -> FileMerger { let (request_sender, request_receiver) = channel(); - let mut reader_files = Vec::with_capacity(files.len()); - let mut loaded_receivers = Vec::with_capacity(files.len()); + let mut reader_files = Vec::with_capacity(files.size_hint().0); + let mut loaded_receivers = Vec::with_capacity(files.size_hint().0); for (file_number, file) in files.enumerate() { let (sender, receiver) = sync_channel(2); loaded_receivers.push(receiver); @@ -148,7 +197,11 @@ impl<'a> FileMerger<'a> { /// Write the merged contents to the output file. pub fn write_all(&mut self, settings: &GlobalSettings) { let mut out = settings.out_writer(); - while self.write_next(settings, &mut out) {} + self.write_all_to(settings, &mut out); + } + + pub fn write_all_to(&mut self, settings: &GlobalSettings, out: &mut impl Write) { + while self.write_next(settings, out) {} } fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool { diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 6cdf051c1..70e3325ad 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -96,6 +96,7 @@ static OPT_FILES0_FROM: &str = "files0-from"; static OPT_BUF_SIZE: &str = "buffer-size"; static OPT_TMP_DIR: &str = "temporary-directory"; static OPT_COMPRESS_PROG: &str = "compress-program"; +static OPT_BATCH_SIZE: &str = "batch-size"; static ARG_FILES: &str = "files"; @@ -157,6 +158,7 @@ pub struct GlobalSettings { buffer_size: usize, tmp_dir: PathBuf, compress_prog: Option, + merge_batch_size: usize, } impl GlobalSettings { @@ -226,6 +228,7 @@ impl Default for GlobalSettings { buffer_size: DEFAULT_BUF_SIZE, tmp_dir: PathBuf::new(), compress_prog: None, + merge_batch_size: 16, } } } @@ -1086,6 +1089,12 @@ pub fn uumain(args: impl uucore::Args) -> i32 { .long_help("PROG has to take input from stdin and output to stdout") .value_name("PROG") ) + .arg( + Arg::with_name(OPT_BATCH_SIZE) + .long(OPT_BATCH_SIZE) + .help("Merge at most N_MERGE inputs at once.") + .value_name("N_MERGE") + ) .arg( Arg::with_name(OPT_FILES0_FROM) .long(OPT_FILES0_FROM) @@ -1177,6 +1186,12 @@ pub fn uumain(args: impl uucore::Args) -> i32 { settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from); + if let Some(n_merge) = matches.value_of(OPT_BATCH_SIZE) { + settings.merge_batch_size = n_merge + .parse() + .unwrap_or_else(|_| crash!(2, "invalid --batch-size argument '{}'", n_merge)); + } + settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.merge = matches.is_present(OPT_MERGE); @@ -1252,7 +1267,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator>, settings: & fn exec(files: &[String], settings: &GlobalSettings) -> i32 { if settings.merge { - let mut file_merger = merge::merge(files.iter().map(open), settings); + let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings); file_merger.write_all(settings); } else if settings.check { if files.len() > 1 { diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index e731d5b1d..75611abfc 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -840,3 +840,16 @@ fn test_compress_fail() { .fails() .stderr_only("sort: couldn't execute compress program: errno 2"); } + +#[test] +fn test_merge_batches() { + new_ucmd!() + .args(&[ + "ext_sort.txt", + "-n", + "-S", + "150B", + ]) + .succeeds() + .stdout_only_fixture("ext_sort.expected"); +}