1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-08-03 06:27:45 +00:00

Merge pull request #2381 from miDeb/sort/merge-improvements

sort: delete temporary files as soon as possible
This commit is contained in:
Terts Diepraam 2021-06-17 00:00:59 +02:00 committed by GitHub
commit ce6d439a1b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 463 additions and 229 deletions

View file

@ -34,9 +34,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
move || reader(file, recycled_receiver, loaded_sender, &settings) move || reader(file, recycled_receiver, loaded_sender, &settings)
}); });
for _ in 0..2 { for _ in 0..2 {
recycled_sender let _ = recycled_sender.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new()));
.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new()))
.unwrap();
} }
let mut prev_chunk: Option<Chunk> = None; let mut prev_chunk: Option<Chunk> = None;
@ -55,7 +53,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
} }
return 1; return 1;
} }
recycled_sender.send(prev_chunk).ok(); let _ = recycled_sender.send(prev_chunk);
} }
for (a, b) in chunk.borrow_lines().iter().tuple_windows() { for (a, b) in chunk.borrow_lines().iter().tuple_windows() {
@ -80,12 +78,11 @@ fn reader(
sender: SyncSender<Chunk>, sender: SyncSender<Chunk>,
settings: &GlobalSettings, settings: &GlobalSettings,
) { ) {
let mut sender = Some(sender);
let mut carry_over = vec![]; let mut carry_over = vec![];
for chunk in receiver.iter() { for chunk in receiver.iter() {
let (recycled_lines, recycled_buffer) = chunk.recycle(); let (recycled_lines, recycled_buffer) = chunk.recycle();
chunks::read( let should_continue = chunks::read(
&mut sender, &sender,
recycled_buffer, recycled_buffer,
None, None,
&mut carry_over, &mut carry_over,
@ -98,6 +95,9 @@ fn reader(
}, },
recycled_lines, recycled_lines,
settings, settings,
) );
if !should_continue {
break;
}
} }
} }

View file

@ -52,17 +52,17 @@ impl Chunk {
/// Read a chunk, parse lines and send them. /// Read a chunk, parse lines and send them.
/// ///
/// No empty chunk will be sent. If we reach the end of the input, sender_option /// No empty chunk will be sent. If we reach the end of the input, `false` is returned.
/// is set to None. If this function however does not set sender_option to None, /// However, if this function returns `true`, it is not guaranteed that there is still
/// it is not guaranteed that there is still input left: If the input fits _exactly_ /// input left: If the input fits _exactly_ into a buffer, we will only notice that there's
/// into a buffer, we will only notice that there's nothing more to read at the next /// nothing more to read at the next invocation. In case there is no input left, nothing will
/// invocation. /// be sent.
/// ///
/// # Arguments /// # Arguments
/// ///
/// (see also `read_to_chunk` for a more detailed documentation) /// (see also `read_to_chunk` for a more detailed documentation)
/// ///
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, this function does nothing. /// * `sender`: The sender to send the lines to the sorter.
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled.
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
/// * `max_buffer_size`: How big `buffer` can be. /// * `max_buffer_size`: How big `buffer` can be.
@ -73,52 +73,47 @@ impl Chunk {
/// * `lines`: The recycled vector to fill with lines. Must be empty. /// * `lines`: The recycled vector to fill with lines. Must be empty.
/// * `settings`: The global settings. /// * `settings`: The global settings.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[allow(clippy::borrowed_box)] pub fn read<T: Read>(
pub fn read( sender: &SyncSender<Chunk>,
sender_option: &mut Option<SyncSender<Chunk>>,
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
max_buffer_size: Option<usize>, max_buffer_size: Option<usize>,
carry_over: &mut Vec<u8>, carry_over: &mut Vec<u8>,
file: &mut Box<dyn Read + Send>, file: &mut T,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>, next_files: &mut impl Iterator<Item = T>,
separator: u8, separator: u8,
lines: Vec<Line<'static>>, lines: Vec<Line<'static>>,
settings: &GlobalSettings, settings: &GlobalSettings,
) { ) -> bool {
assert!(lines.is_empty()); assert!(lines.is_empty());
if let Some(sender) = sender_option { if buffer.len() < carry_over.len() {
if buffer.len() < carry_over.len() { buffer.resize(carry_over.len() + 10 * 1024, 0);
buffer.resize(carry_over.len() + 10 * 1024, 0);
}
buffer[..carry_over.len()].copy_from_slice(carry_over);
let (read, should_continue) = read_to_buffer(
file,
next_files,
&mut buffer,
max_buffer_size,
carry_over.len(),
separator,
);
carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]);
if read != 0 {
let payload = Chunk::new(buffer, |buf| {
let mut lines = unsafe {
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
};
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
parse_lines(read, &mut lines, separator, settings);
lines
});
sender.send(payload).unwrap();
}
if !should_continue {
*sender_option = None;
}
} }
buffer[..carry_over.len()].copy_from_slice(carry_over);
let (read, should_continue) = read_to_buffer(
file,
next_files,
&mut buffer,
max_buffer_size,
carry_over.len(),
separator,
);
carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]);
if read != 0 {
let payload = Chunk::new(buffer, |buf| {
let mut lines = unsafe {
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
};
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
parse_lines(read, &mut lines, separator, settings);
lines
});
sender.send(payload).unwrap();
}
should_continue
} }
/// Split `read` into `Line`s, and add them to `lines`. /// Split `read` into `Line`s, and add them to `lines`.
@ -165,10 +160,9 @@ fn parse_lines<'a>(
/// The remaining bytes must be copied to the start of the buffer for the next invocation, /// The remaining bytes must be copied to the start of the buffer for the next invocation,
/// if another invocation is necessary, which is determined by the other return value. /// if another invocation is necessary, which is determined by the other return value.
/// * Whether this function should be called again. /// * Whether this function should be called again.
#[allow(clippy::borrowed_box)] fn read_to_buffer<T: Read>(
fn read_to_buffer( file: &mut T,
file: &mut Box<dyn Read + Send>, next_files: &mut impl Iterator<Item = T>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
max_buffer_size: Option<usize>, max_buffer_size: Option<usize>,
start_offset: usize, start_offset: usize,

View file

@ -12,14 +12,10 @@
//! The buffers for the individual chunks are recycled. There are two buffers. //! The buffers for the individual chunks are recycled. There are two buffers.
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fs::File; use std::io::Write;
use std::io::BufReader;
use std::io::{BufWriter, Write};
use std::path::Path; use std::path::Path;
use std::process::Child; use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::{ use std::{
fs::OpenOptions,
io::Read, io::Read,
sync::mpsc::{Receiver, SyncSender}, sync::mpsc::{Receiver, SyncSender},
thread, thread,
@ -27,72 +23,78 @@ use std::{
use itertools::Itertools; use itertools::Itertools;
use tempfile::TempDir; use crate::merge::ClosedTmpFile;
use crate::merge::WriteableCompressedTmpFile;
use crate::merge::WriteablePlainTmpFile;
use crate::merge::WriteableTmpFile;
use crate::Line; use crate::Line;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
}; };
use tempfile::TempDir;
const START_BUFFER_SIZE: usize = 8_000; const START_BUFFER_SIZE: usize = 8_000;
/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings: &GlobalSettings) { pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings: &GlobalSettings) {
let tmp_dir = crash_if_err!(
1,
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
);
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
thread::spawn({ thread::spawn({
let settings = settings.clone(); let settings = settings.clone();
move || sorter(recycled_receiver, sorted_sender, settings) move || sorter(recycled_receiver, sorted_sender, settings)
}); });
let read_result = reader_writer( if settings.compress_prog.is_some() {
reader_writer::<_, WriteableCompressedTmpFile>(
files,
settings,
sorted_receiver,
recycled_sender,
);
} else {
reader_writer::<_, WriteablePlainTmpFile>(
files,
settings,
sorted_receiver,
recycled_sender,
);
}
}
fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile + 'static>(
files: F,
settings: &GlobalSettings,
receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>,
) {
let separator = if settings.zero_terminated {
b'\0'
} else {
b'\n'
};
// Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly
// around settings.buffer_size as a whole.
let buffer_size = settings.buffer_size / 10;
let read_result: ReadResult<Tmp> = read_write_loop(
files, files,
&tmp_dir, &settings.tmp_dir,
if settings.zero_terminated { separator,
b'\0'
} else {
b'\n'
},
// Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly // Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly
// around settings.buffer_size as a whole. // around settings.buffer_size as a whole.
settings.buffer_size / 10, buffer_size,
settings.clone(), settings,
sorted_receiver, receiver,
recycled_sender, sender,
); );
match read_result { match read_result {
ReadResult::WroteChunksToFile { chunks_written } => { ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => {
let mut children = Vec::new(); let tmp_dir_size = tmp_files.len();
let files = (0..chunks_written).map(|chunk_num| { let mut merger = merge::merge_with_file_limit::<_, _, Tmp>(
let file_path = tmp_dir.path().join(chunk_num.to_string()); tmp_files.into_iter().map(|c| c.reopen()),
let file = File::open(file_path).unwrap(); settings,
if let Some(compress_prog) = &settings.compress_prog { Some((tmp_dir, tmp_dir_size)),
let mut command = Command::new(compress_prog); );
command.stdin(file).stdout(Stdio::piped()).arg("-d");
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let child_stdout = child.stdout.take().unwrap();
children.push(child);
Box::new(BufReader::new(child_stdout)) as Box<dyn Read + Send>
} else {
Box::new(BufReader::new(file)) as Box<dyn Read + Send>
}
});
let mut merger = merge::merge_with_file_limit(files, settings);
for child in children {
assert_child_success(child, settings.compress_prog.as_ref().unwrap());
}
merger.write_all(settings); merger.write_all(settings);
} }
ReadResult::SortedSingleChunk(chunk) => { ReadResult::SortedSingleChunk(chunk) => {
@ -122,7 +124,7 @@ fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: Global
} }
/// Describes how we read the chunks from the input. /// Describes how we read the chunks from the input.
enum ReadResult { enum ReadResult<I: WriteableTmpFile> {
/// The input was empty. Nothing was read. /// The input was empty. Nothing was read.
EmptyInput, EmptyInput,
/// The input fits into a single Chunk, which was kept in memory. /// The input fits into a single Chunk, which was kept in memory.
@ -131,33 +133,27 @@ enum ReadResult {
SortedTwoChunks([Chunk; 2]), SortedTwoChunks([Chunk; 2]),
/// The input was read into multiple chunks, which were written to auxiliary files. /// The input was read into multiple chunks, which were written to auxiliary files.
WroteChunksToFile { WroteChunksToFile {
/// The number of chunks written to auxiliary files. tmp_files: Vec<I::Closed>,
chunks_written: usize, tmp_dir: TempDir,
}, },
} }
/// The function that is executed on the reader/writer thread. /// The function that is executed on the reader/writer thread.
/// fn read_write_loop<I: WriteableTmpFile>(
/// # Returns
/// * The number of chunks read.
fn reader_writer(
mut files: impl Iterator<Item = Box<dyn Read + Send>>, mut files: impl Iterator<Item = Box<dyn Read + Send>>,
tmp_dir: &TempDir, tmp_dir_parent: &Path,
separator: u8, separator: u8,
buffer_size: usize, buffer_size: usize,
settings: GlobalSettings, settings: &GlobalSettings,
receiver: Receiver<Chunk>, receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>, sender: SyncSender<Chunk>,
) -> ReadResult { ) -> ReadResult<I> {
let mut sender_option = Some(sender);
let mut file = files.next().unwrap(); let mut file = files.next().unwrap();
let mut carry_over = vec![]; let mut carry_over = vec![];
// kick things off with two reads // kick things off with two reads
for _ in 0..2 { for _ in 0..2 {
chunks::read( let should_continue = chunks::read(
&mut sender_option, &sender,
vec![ vec![
0; 0;
if START_BUFFER_SIZE < buffer_size { if START_BUFFER_SIZE < buffer_size {
@ -172,9 +168,11 @@ fn reader_writer(
&mut files, &mut files,
separator, separator,
Vec::new(), Vec::new(),
&settings, settings,
); );
if sender_option.is_none() {
if !should_continue {
drop(sender);
// We have already read the whole input. Since we are in our first two reads, // We have already read the whole input. Since we are in our first two reads,
// this means that we can fit the whole input into memory. Bypass writing below and // this means that we can fit the whole input into memory. Bypass writing below and
// handle this case in a more straightforward way. // handle this case in a more straightforward way.
@ -190,68 +188,69 @@ fn reader_writer(
} }
} }
let tmp_dir = crash_if_err!(
1,
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(tmp_dir_parent)
);
let mut sender_option = Some(sender);
let mut file_number = 0; let mut file_number = 0;
let mut tmp_files = vec![];
loop { loop {
let mut chunk = match receiver.recv() { let mut chunk = match receiver.recv() {
Ok(it) => it, Ok(it) => it,
_ => { _ => {
return ReadResult::WroteChunksToFile { return ReadResult::WroteChunksToFile { tmp_files, tmp_dir };
chunks_written: file_number,
}
} }
}; };
write( let tmp_file = write::<I>(
&mut chunk, &mut chunk,
&tmp_dir.path().join(file_number.to_string()), tmp_dir.path().join(file_number.to_string()),
settings.compress_prog.as_deref(), settings.compress_prog.as_deref(),
separator, separator,
); );
tmp_files.push(tmp_file);
file_number += 1; file_number += 1;
let (recycled_lines, recycled_buffer) = chunk.recycle(); let (recycled_lines, recycled_buffer) = chunk.recycle();
chunks::read( if let Some(sender) = &sender_option {
&mut sender_option, let should_continue = chunks::read(
recycled_buffer, sender,
None, recycled_buffer,
&mut carry_over, None,
&mut file, &mut carry_over,
&mut files, &mut file,
separator, &mut files,
recycled_lines, separator,
&settings, recycled_lines,
); settings,
);
if !should_continue {
sender_option = None;
}
}
} }
} }
/// Write the lines in `chunk` to `file`, separated by `separator`. /// Write the lines in `chunk` to `file`, separated by `separator`.
/// `compress_prog` is used to optionally compress file contents. /// `compress_prog` is used to optionally compress file contents.
fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) { fn write<I: WriteableTmpFile>(
chunk: &mut Chunk,
file: PathBuf,
compress_prog: Option<&str>,
separator: u8,
) -> I::Closed {
chunk.with_lines_mut(|lines| { chunk.with_lines_mut(|lines| {
// Write the lines to the file // Write the lines to the file
let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file)); let mut tmp_file = I::create(file, compress_prog);
if let Some(compress_prog) = compress_prog { write_lines(lines, tmp_file.as_write(), separator);
let mut command = Command::new(compress_prog); tmp_file.finished_writing()
command.stdin(Stdio::piped()).stdout(file); })
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let mut writer = BufWriter::new(child.stdin.take().unwrap());
write_lines(lines, &mut writer, separator);
writer.flush().unwrap();
drop(writer);
assert_child_success(child, compress_prog);
} else {
let mut writer = BufWriter::new(file);
write_lines(lines, &mut writer, separator);
};
});
} }
fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) { fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) {
@ -260,12 +259,3 @@ fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8)
crash_if_err!(1, writer.write_all(&[separator])); crash_if_err!(1, writer.write_all(&[separator]));
} }
} }
fn assert_child_success(mut child: Child, program: &str) {
if !matches!(
child.wait().map(|e| e.code()),
Ok(Some(0)) | Ok(None) | Err(_)
) {
crash!(2, "'{}' terminated abnormally", program)
}
}

View file

@ -9,9 +9,11 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
fs::File, fs::{self, File},
io::{BufWriter, Read, Write}, io::{BufWriter, Read, Write},
iter, iter,
path::PathBuf,
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
rc::Rc, rc::Rc,
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}, sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
thread, thread,
@ -19,61 +21,94 @@ use std::{
use compare::Compare; use compare::Compare;
use itertools::Itertools; use itertools::Itertools;
use tempfile::TempDir;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
compare_by, GlobalSettings, compare_by, GlobalSettings,
}; };
// Merge already sorted files. /// Merge pre-sorted `Box<dyn Read>`s.
pub fn merge_with_file_limit<F: ExactSizeIterator<Item = Box<dyn Read + Send>>>( ///
files: F, /// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
pub fn merge<Files: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
files: Files,
settings: &GlobalSettings, settings: &GlobalSettings,
) -> FileMerger {
if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
files.map(|file| PlainMergeInput { inner: file }),
settings,
None,
)
} else {
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
files.map(|file| PlainMergeInput { inner: file }),
settings,
None,
)
}
}
// Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit<
M: MergeInput + 'static,
F: ExactSizeIterator<Item = M>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
settings: &GlobalSettings,
tmp_dir: Option<(TempDir, usize)>,
) -> FileMerger { ) -> FileMerger {
if files.len() > settings.merge_batch_size { if files.len() > settings.merge_batch_size {
let tmp_dir = tempfile::Builder::new() // If we did not get a tmp_dir, create one.
.prefix("uutils_sort") let (tmp_dir, mut tmp_dir_size) = tmp_dir.unwrap_or_else(|| {
.tempdir_in(&settings.tmp_dir) (
.unwrap(); tempfile::Builder::new()
let mut batch_number = 0; .prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.unwrap(),
0,
)
});
let mut remaining_files = files.len(); let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size); let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter(); let mut batches = batches.into_iter();
while batch_number + remaining_files > settings.merge_batch_size && remaining_files != 0 { let mut temporary_files = vec![];
while remaining_files != 0 {
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let mut merger = merge_without_limit(batches.next().unwrap(), settings); let mut merger = merge_without_limit(batches.next().unwrap(), settings);
let tmp_file = File::create(tmp_dir.path().join(batch_number.to_string())).unwrap(); let mut tmp_file = Tmp::create(
merger.write_all_to(settings, &mut BufWriter::new(tmp_file)); tmp_dir.path().join(tmp_dir_size.to_string()),
batch_number += 1; settings.compress_prog.as_deref(),
} );
let batch_files = (0..batch_number).map(|n| { tmp_dir_size += 1;
Box::new(File::open(tmp_dir.path().join(n.to_string())).unwrap()) merger.write_all_to(settings, tmp_file.as_write());
as Box<dyn Read + Send> temporary_files.push(tmp_file.finished_writing());
});
if batch_number > settings.merge_batch_size {
assert!(batches.next().is_none());
merge_with_file_limit(
Box::new(batch_files) as Box<dyn ExactSizeIterator<Item = Box<dyn Read + Send>>>,
settings,
)
} else {
let final_batch = batches.next();
assert!(batches.next().is_none());
merge_without_limit(
batch_files.chain(final_batch.into_iter().flatten()),
settings,
)
} }
assert!(batches.next().is_none());
merge_with_file_limit::<_, _, Tmp>(
temporary_files
.into_iter()
.map(Box::new(|c: Tmp::Closed| c.reopen())
as Box<
dyn FnMut(Tmp::Closed) -> <Tmp::Closed as ClosedTmpFile>::Reopened,
>),
settings,
Some((tmp_dir, tmp_dir_size)),
)
} else { } else {
merge_without_limit(files, settings) merge_without_limit(files, settings)
} }
} }
/// Merge files without limiting how many files are concurrently open /// Merge files without limiting how many files are concurrently open.
/// ///
/// It is the responsibility of the caller to ensure that `files` yields only /// It is the responsibility of the caller to ensure that `files` yields only
/// as many files as we are allowed to open concurrently. /// as many files as we are allowed to open concurrently.
fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>( fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
files: F, files: F,
settings: &GlobalSettings, settings: &GlobalSettings,
) -> FileMerger { ) -> FileMerger {
@ -83,16 +118,18 @@ fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
for (file_number, file) in files.enumerate() { for (file_number, file) in files.enumerate() {
let (sender, receiver) = sync_channel(2); let (sender, receiver) = sync_channel(2);
loaded_receivers.push(receiver); loaded_receivers.push(receiver);
reader_files.push(ReaderFile { reader_files.push(Some(ReaderFile {
file, file,
sender: Some(sender), sender,
carry_over: vec![], carry_over: vec![],
}); }));
// Send the initial chunk to trigger a read for each file
request_sender request_sender
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) .send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
.unwrap(); .unwrap();
} }
// Send the second chunk for each file
for file_number in 0..reader_files.len() { for file_number in 0..reader_files.len() {
request_sender request_sender
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) .send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
@ -136,37 +173,45 @@ fn merge_without_limit<F: Iterator<Item = Box<dyn Read + Send>>>(
} }
} }
/// The struct on the reader thread representing an input file /// The struct on the reader thread representing an input file
struct ReaderFile { struct ReaderFile<M: MergeInput> {
file: Box<dyn Read + Send>, file: M,
sender: Option<SyncSender<Chunk>>, sender: SyncSender<Chunk>,
carry_over: Vec<u8>, carry_over: Vec<u8>,
} }
/// The function running on the reader thread. /// The function running on the reader thread.
fn reader( fn reader(
recycled_receiver: Receiver<(usize, Chunk)>, recycled_receiver: Receiver<(usize, Chunk)>,
files: &mut [ReaderFile], files: &mut [Option<ReaderFile<impl MergeInput>>],
settings: &GlobalSettings, settings: &GlobalSettings,
separator: u8, separator: u8,
) { ) {
for (file_idx, chunk) in recycled_receiver.iter() { for (file_idx, chunk) in recycled_receiver.iter() {
let (recycled_lines, recycled_buffer) = chunk.recycle(); let (recycled_lines, recycled_buffer) = chunk.recycle();
let ReaderFile { if let Some(ReaderFile {
file, file,
sender, sender,
carry_over, carry_over,
} = &mut files[file_idx]; }) = &mut files[file_idx]
chunks::read( {
sender, let should_continue = chunks::read(
recycled_buffer, sender,
None, recycled_buffer,
carry_over, None,
file, carry_over,
&mut iter::empty(), file.as_read(),
separator, &mut iter::empty(),
recycled_lines, separator,
settings, recycled_lines,
); settings,
);
if !should_continue {
// Remove the file from the list by replacing it with `None`.
let ReaderFile { file, .. } = files[file_idx].take().unwrap();
// Depending on the kind of the `MergeInput`, this may delete the file:
file.finished_reading();
}
}
} }
} }
/// The struct on the main thread representing an input file /// The struct on the main thread representing an input file
@ -241,11 +286,14 @@ impl<'a> FileMerger<'a> {
self.heap.pop(); self.heap.pop();
} }
} else { } else {
// This will cause the comparison to use a different line and the heap to readjust.
self.heap.peek_mut().unwrap().line_idx += 1; self.heap.peek_mut().unwrap().line_idx += 1;
} }
if let Some(prev) = prev { if let Some(prev) = prev {
if let Ok(prev_chunk) = Rc::try_unwrap(prev.chunk) { if let Ok(prev_chunk) = Rc::try_unwrap(prev.chunk) {
// If nothing is referencing the previous chunk anymore, this means that the previous line
// was the last line of the chunk. We can recycle the chunk.
self.request_sender self.request_sender
.send((prev.file_number, prev_chunk)) .send((prev.file_number, prev_chunk))
.ok(); .ok();
@ -273,7 +321,195 @@ impl<'a> Compare<MergeableFile> for FileComparator<'a> {
// as lines from a file with a lower number are to be considered "earlier". // as lines from a file with a lower number are to be considered "earlier".
cmp = a.file_number.cmp(&b.file_number); cmp = a.file_number.cmp(&b.file_number);
} }
// Our BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering. // BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering.
cmp.reverse() cmp.reverse()
} }
} }
// Wait for the child to exit and check its exit code.
fn assert_child_success(mut child: Child, program: &str) {
if !matches!(
child.wait().map(|e| e.code()),
Ok(Some(0)) | Ok(None) | Err(_)
) {
crash!(2, "'{}' terminated abnormally", program)
}
}
/// A temporary file that can be written to.
pub trait WriteableTmpFile {
type Closed: ClosedTmpFile;
type InnerWrite: Write;
fn create(path: PathBuf, compress_prog: Option<&str>) -> Self;
/// Closes the temporary file.
fn finished_writing(self) -> Self::Closed;
fn as_write(&mut self) -> &mut Self::InnerWrite;
}
/// A temporary file that is (temporarily) closed, but can be reopened.
pub trait ClosedTmpFile {
type Reopened: MergeInput;
/// Reopens the temporary file.
fn reopen(self) -> Self::Reopened;
}
/// A pre-sorted input for merging.
pub trait MergeInput: Send {
type InnerRead: Read;
/// Cleans this `MergeInput` up.
/// Implementations may delete the backing file.
fn finished_reading(self);
fn as_read(&mut self) -> &mut Self::InnerRead;
}
pub struct WriteablePlainTmpFile {
path: PathBuf,
file: BufWriter<File>,
}
pub struct ClosedPlainTmpFile {
path: PathBuf,
}
pub struct PlainTmpMergeInput {
path: PathBuf,
file: File,
}
impl WriteableTmpFile for WriteablePlainTmpFile {
type Closed = ClosedPlainTmpFile;
type InnerWrite = BufWriter<File>;
fn create(path: PathBuf, _: Option<&str>) -> Self {
WriteablePlainTmpFile {
file: BufWriter::new(File::create(&path).unwrap()),
path,
}
}
fn finished_writing(self) -> Self::Closed {
ClosedPlainTmpFile { path: self.path }
}
fn as_write(&mut self) -> &mut Self::InnerWrite {
&mut self.file
}
}
impl ClosedTmpFile for ClosedPlainTmpFile {
type Reopened = PlainTmpMergeInput;
fn reopen(self) -> Self::Reopened {
PlainTmpMergeInput {
file: File::open(&self.path).unwrap(),
path: self.path,
}
}
}
impl MergeInput for PlainTmpMergeInput {
type InnerRead = File;
fn finished_reading(self) {
fs::remove_file(self.path).ok();
}
fn as_read(&mut self) -> &mut Self::InnerRead {
&mut self.file
}
}
pub struct WriteableCompressedTmpFile {
path: PathBuf,
compress_prog: String,
child: Child,
child_stdin: BufWriter<ChildStdin>,
}
pub struct ClosedCompressedTmpFile {
path: PathBuf,
compress_prog: String,
}
pub struct CompressedTmpMergeInput {
path: PathBuf,
compress_prog: String,
child: Child,
child_stdout: ChildStdout,
}
impl WriteableTmpFile for WriteableCompressedTmpFile {
type Closed = ClosedCompressedTmpFile;
type InnerWrite = BufWriter<ChildStdin>;
fn create(path: PathBuf, compress_prog: Option<&str>) -> Self {
let compress_prog = compress_prog.unwrap();
let mut command = Command::new(compress_prog);
command
.stdin(Stdio::piped())
.stdout(File::create(&path).unwrap());
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let child_stdin = child.stdin.take().unwrap();
WriteableCompressedTmpFile {
path,
compress_prog: compress_prog.to_owned(),
child,
child_stdin: BufWriter::new(child_stdin),
}
}
fn finished_writing(self) -> Self::Closed {
drop(self.child_stdin);
assert_child_success(self.child, &self.compress_prog);
ClosedCompressedTmpFile {
path: self.path,
compress_prog: self.compress_prog,
}
}
fn as_write(&mut self) -> &mut Self::InnerWrite {
&mut self.child_stdin
}
}
impl ClosedTmpFile for ClosedCompressedTmpFile {
type Reopened = CompressedTmpMergeInput;
fn reopen(self) -> Self::Reopened {
let mut command = Command::new(&self.compress_prog);
let file = File::open(&self.path).unwrap();
command.stdin(file).stdout(Stdio::piped()).arg("-d");
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let child_stdout = child.stdout.take().unwrap();
CompressedTmpMergeInput {
path: self.path,
compress_prog: self.compress_prog,
child,
child_stdout,
}
}
}
impl MergeInput for CompressedTmpMergeInput {
type InnerRead = ChildStdout;
fn finished_reading(self) {
drop(self.child_stdout);
assert_child_success(self.child, &self.compress_prog);
fs::remove_file(self.path).ok();
}
fn as_read(&mut self) -> &mut Self::InnerRead {
&mut self.child_stdout
}
}
pub struct PlainMergeInput<R: Read + Send> {
inner: R,
}
impl<R: Read + Send> MergeInput for PlainMergeInput<R> {
type InnerRead = R;
fn finished_reading(self) {}
fn as_read(&mut self) -> &mut Self::InnerRead {
&mut self.inner
}
}

View file

@ -236,7 +236,7 @@ impl Default for GlobalSettings {
buffer_size: DEFAULT_BUF_SIZE, buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(), tmp_dir: PathBuf::new(),
compress_prog: None, compress_prog: None,
merge_batch_size: 16, merge_batch_size: 32,
} }
} }
} }
@ -1313,7 +1313,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &
fn exec(files: &[String], settings: &GlobalSettings) -> i32 { fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
if settings.merge { if settings.merge {
let mut file_merger = merge::merge_with_file_limit(files.iter().map(open), settings); let mut file_merger = merge::merge(files.iter().map(open), settings);
file_merger.write_all(settings); file_merger.write_all(settings);
} else if settings.check { } else if settings.check {
if files.len() > 1 { if files.len() > 1 {
@ -1516,8 +1516,6 @@ fn get_hash<T: Hash>(t: &T) -> u64 {
} }
fn random_shuffle(a: &str, b: &str, salt: &str) -> Ordering { fn random_shuffle(a: &str, b: &str, salt: &str) -> Ordering {
#![allow(clippy::comparison_chain)]
let da = get_hash(&[a, salt].concat()); let da = get_hash(&[a, salt].concat());
let db = get_hash(&[b, salt].concat()); let db = get_hash(&[b, salt].concat());

View file

@ -897,3 +897,19 @@ fn test_merge_batches() {
.succeeds() .succeeds()
.stdout_only_fixture("ext_sort.expected"); .stdout_only_fixture("ext_sort.expected");
} }
#[test]
fn test_merge_batch_size() {
new_ucmd!()
.arg("--batch-size=2")
.arg("-m")
.arg("--unique")
.arg("merge_ints_interleaved_1.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_1.txt")
.succeeds()
.stdout_only_fixture("merge_ints_interleaved.expected");
}