1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 03:27:44 +00:00

Merge pull request #2443 from miDeb/sort/data-oriented

sort: separate additional data from the Line struct
This commit is contained in:
Sylvestre Ledru 2021-06-24 21:19:23 +02:00 committed by GitHub
commit fef8761ac1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 362 additions and 232 deletions

View file

@ -8,7 +8,7 @@
//! Check if a file is ordered //! Check if a file is ordered
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk, RecycledChunk},
compare_by, open, GlobalSettings, compare_by, open, GlobalSettings,
}; };
use itertools::Itertools; use itertools::Itertools;
@ -34,7 +34,7 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
move || reader(file, recycled_receiver, loaded_sender, &settings) move || reader(file, recycled_receiver, loaded_sender, &settings)
}); });
for _ in 0..2 { for _ in 0..2 {
let _ = recycled_sender.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new())); let _ = recycled_sender.send(RecycledChunk::new(100 * 1024));
} }
let mut prev_chunk: Option<Chunk> = None; let mut prev_chunk: Option<Chunk> = None;
@ -44,21 +44,29 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
if let Some(prev_chunk) = prev_chunk.take() { if let Some(prev_chunk) = prev_chunk.take() {
// Check if the first element of the new chunk is greater than the last // Check if the first element of the new chunk is greater than the last
// element from the previous chunk // element from the previous chunk
let prev_last = prev_chunk.borrow_lines().last().unwrap(); let prev_last = prev_chunk.lines().last().unwrap();
let new_first = chunk.borrow_lines().first().unwrap(); let new_first = chunk.lines().first().unwrap();
if compare_by(prev_last, new_first, settings) == Ordering::Greater { if compare_by(
prev_last,
new_first,
settings,
prev_chunk.line_data(),
chunk.line_data(),
) == Ordering::Greater
{
if !settings.check_silent { if !settings.check_silent {
println!("sort: {}:{}: disorder: {}", path, line_idx, new_first.line); println!("sort: {}:{}: disorder: {}", path, line_idx, new_first.line);
} }
return 1; return 1;
} }
let _ = recycled_sender.send(prev_chunk); let _ = recycled_sender.send(prev_chunk.recycle());
} }
for (a, b) in chunk.borrow_lines().iter().tuple_windows() { for (a, b) in chunk.lines().iter().tuple_windows() {
line_idx += 1; line_idx += 1;
if compare_by(a, b, settings) == Ordering::Greater { if compare_by(a, b, settings, chunk.line_data(), chunk.line_data()) == Ordering::Greater
{
if !settings.check_silent { if !settings.check_silent {
println!("sort: {}:{}: disorder: {}", path, line_idx, b.line); println!("sort: {}:{}: disorder: {}", path, line_idx, b.line);
} }
@ -74,16 +82,15 @@ pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
/// The function running on the reader thread. /// The function running on the reader thread.
fn reader( fn reader(
mut file: Box<dyn Read + Send>, mut file: Box<dyn Read + Send>,
receiver: Receiver<Chunk>, receiver: Receiver<RecycledChunk>,
sender: SyncSender<Chunk>, sender: SyncSender<Chunk>,
settings: &GlobalSettings, settings: &GlobalSettings,
) { ) {
let mut carry_over = vec![]; let mut carry_over = vec![];
for chunk in receiver.iter() { for recycled_chunk in receiver.iter() {
let (recycled_lines, recycled_buffer) = chunk.recycle();
let should_continue = chunks::read( let should_continue = chunks::read(
&sender, &sender,
recycled_buffer, recycled_chunk,
None, None,
&mut carry_over, &mut carry_over,
&mut file, &mut file,
@ -93,7 +100,6 @@ fn reader(
} else { } else {
b'\n' b'\n'
}, },
recycled_lines,
settings, settings,
); );
if !should_continue { if !should_continue {

View file

@ -15,7 +15,7 @@ use std::{
use memchr::memchr_iter; use memchr::memchr_iter;
use ouroboros::self_referencing; use ouroboros::self_referencing;
use crate::{GlobalSettings, Line}; use crate::{numeric_str_cmp::NumInfo, GeneralF64ParseResult, GlobalSettings, Line};
/// The chunk that is passed around between threads. /// The chunk that is passed around between threads.
/// `lines` consist of slices into `buffer`. /// `lines` consist of slices into `buffer`.
@ -25,28 +25,87 @@ pub struct Chunk {
pub buffer: Vec<u8>, pub buffer: Vec<u8>,
#[borrows(buffer)] #[borrows(buffer)]
#[covariant] #[covariant]
pub lines: Vec<Line<'this>>, pub contents: ChunkContents<'this>,
}
#[derive(Debug)]
pub struct ChunkContents<'a> {
pub lines: Vec<Line<'a>>,
pub line_data: LineData<'a>,
}
#[derive(Debug)]
pub struct LineData<'a> {
pub selections: Vec<&'a str>,
pub num_infos: Vec<NumInfo>,
pub parsed_floats: Vec<GeneralF64ParseResult>,
} }
impl Chunk { impl Chunk {
/// Destroy this chunk and return its components to be reused. /// Destroy this chunk and return its components to be reused.
/// pub fn recycle(mut self) -> RecycledChunk {
/// # Returns let recycled_contents = self.with_contents_mut(|contents| {
/// contents.lines.clear();
/// * The `lines` vector, emptied contents.line_data.selections.clear();
/// * The `buffer` vector, **not** emptied contents.line_data.num_infos.clear();
pub fn recycle(mut self) -> (Vec<Line<'static>>, Vec<u8>) { contents.line_data.parsed_floats.clear();
let recycled_lines = self.with_lines_mut(|lines| { let lines = unsafe {
lines.clear();
unsafe {
// SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime, // SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime,
// because the vector is empty. // because the vector is empty.
// Transmuting is necessary to make recycling possible. See https://github.com/rust-lang/rfcs/pull/2802 // Transmuting is necessary to make recycling possible. See https://github.com/rust-lang/rfcs/pull/2802
// for a rfc to make this unnecessary. Its example is similar to the code here. // for a rfc to make this unnecessary. Its example is similar to the code here.
std::mem::transmute::<Vec<Line<'_>>, Vec<Line<'static>>>(std::mem::take(lines)) std::mem::transmute::<Vec<Line<'_>>, Vec<Line<'static>>>(std::mem::take(
} &mut contents.lines,
))
};
let selections = unsafe {
// SAFETY: (same as above) It is safe to (temporarily) transmute to a vector of &str with a longer lifetime,
// because the vector is empty.
std::mem::transmute::<Vec<&'_ str>, Vec<&'static str>>(std::mem::take(
&mut contents.line_data.selections,
))
};
(
lines,
selections,
std::mem::take(&mut contents.line_data.num_infos),
std::mem::take(&mut contents.line_data.parsed_floats),
)
}); });
(recycled_lines, self.into_heads().buffer) RecycledChunk {
lines: recycled_contents.0,
selections: recycled_contents.1,
num_infos: recycled_contents.2,
parsed_floats: recycled_contents.3,
buffer: self.into_heads().buffer,
}
}
pub fn lines(&self) -> &Vec<Line> {
&self.borrow_contents().lines
}
pub fn line_data(&self) -> &LineData {
&self.borrow_contents().line_data
}
}
pub struct RecycledChunk {
lines: Vec<Line<'static>>,
selections: Vec<&'static str>,
num_infos: Vec<NumInfo>,
parsed_floats: Vec<GeneralF64ParseResult>,
buffer: Vec<u8>,
}
impl RecycledChunk {
pub fn new(capacity: usize) -> Self {
RecycledChunk {
lines: Vec::new(),
selections: Vec::new(),
num_infos: Vec::new(),
parsed_floats: Vec::new(),
buffer: vec![0; capacity],
}
} }
} }
@ -63,28 +122,32 @@ impl Chunk {
/// (see also `read_to_chunk` for a more detailed documentation) /// (see also `read_to_chunk` for a more detailed documentation)
/// ///
/// * `sender`: The sender to send the lines to the sorter. /// * `sender`: The sender to send the lines to the sorter.
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled. /// * `recycled_chunk`: The recycled chunk, as returned by `Chunk::recycle`.
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`) /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
/// * `max_buffer_size`: How big `buffer` can be. /// * `max_buffer_size`: How big `buffer` can be.
/// * `carry_over`: The bytes that must be carried over in between invocations. /// * `carry_over`: The bytes that must be carried over in between invocations.
/// * `file`: The current file. /// * `file`: The current file.
/// * `next_files`: What `file` should be updated to next. /// * `next_files`: What `file` should be updated to next.
/// * `separator`: The line separator. /// * `separator`: The line separator.
/// * `lines`: The recycled vector to fill with lines. Must be empty.
/// * `settings`: The global settings. /// * `settings`: The global settings.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn read<T: Read>( pub fn read<T: Read>(
sender: &SyncSender<Chunk>, sender: &SyncSender<Chunk>,
mut buffer: Vec<u8>, recycled_chunk: RecycledChunk,
max_buffer_size: Option<usize>, max_buffer_size: Option<usize>,
carry_over: &mut Vec<u8>, carry_over: &mut Vec<u8>,
file: &mut T, file: &mut T,
next_files: &mut impl Iterator<Item = T>, next_files: &mut impl Iterator<Item = T>,
separator: u8, separator: u8,
lines: Vec<Line<'static>>,
settings: &GlobalSettings, settings: &GlobalSettings,
) -> bool { ) -> bool {
assert!(lines.is_empty()); let RecycledChunk {
lines,
selections,
num_infos,
parsed_floats,
mut buffer,
} = recycled_chunk;
if buffer.len() < carry_over.len() { if buffer.len() < carry_over.len() {
buffer.resize(carry_over.len() + 10 * 1024, 0); buffer.resize(carry_over.len() + 10 * 1024, 0);
} }
@ -101,15 +164,25 @@ pub fn read<T: Read>(
carry_over.extend_from_slice(&buffer[read..]); carry_over.extend_from_slice(&buffer[read..]);
if read != 0 { if read != 0 {
let payload = Chunk::new(buffer, |buf| { let payload = Chunk::new(buffer, |buffer| {
let selections = unsafe {
// SAFETY: It is safe to transmute to an empty vector of selections with shorter lifetime.
// It was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
std::mem::transmute::<Vec<&'static str>, Vec<&'_ str>>(selections)
};
let mut lines = unsafe { let mut lines = unsafe {
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime, // SAFETY: (same as above) It is safe to transmute to a vector of lines with shorter lifetime,
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible. // because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines) std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
}; };
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read])); let read = crash_if_err!(1, std::str::from_utf8(&buffer[..read]));
parse_lines(read, &mut lines, separator, settings); let mut line_data = LineData {
lines selections,
num_infos,
parsed_floats,
};
parse_lines(read, &mut lines, &mut line_data, separator, settings);
ChunkContents { lines, line_data }
}); });
sender.send(payload).unwrap(); sender.send(payload).unwrap();
} }
@ -120,6 +193,7 @@ pub fn read<T: Read>(
fn parse_lines<'a>( fn parse_lines<'a>(
mut read: &'a str, mut read: &'a str,
lines: &mut Vec<Line<'a>>, lines: &mut Vec<Line<'a>>,
line_data: &mut LineData<'a>,
separator: u8, separator: u8,
settings: &GlobalSettings, settings: &GlobalSettings,
) { ) {
@ -128,9 +202,15 @@ fn parse_lines<'a>(
read = &read[..read.len() - 1]; read = &read[..read.len() - 1];
} }
assert!(lines.is_empty());
assert!(line_data.selections.is_empty());
assert!(line_data.num_infos.is_empty());
assert!(line_data.parsed_floats.is_empty());
let mut token_buffer = vec![];
lines.extend( lines.extend(
read.split(separator as char) read.split(separator as char)
.map(|line| Line::create(line, settings)), .enumerate()
.map(|(index, line)| Line::create(line, index, line_data, &mut token_buffer, settings)),
); );
} }

View file

@ -23,15 +23,16 @@ use std::{
use itertools::Itertools; use itertools::Itertools;
use crate::chunks::RecycledChunk;
use crate::merge::ClosedTmpFile; use crate::merge::ClosedTmpFile;
use crate::merge::WriteableCompressedTmpFile; use crate::merge::WriteableCompressedTmpFile;
use crate::merge::WriteablePlainTmpFile; use crate::merge::WriteablePlainTmpFile;
use crate::merge::WriteableTmpFile; use crate::merge::WriteableTmpFile;
use crate::Line;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk},
compare_by, merge, output_sorted_lines, sort_by, GlobalSettings, compare_by, merge, sort_by, GlobalSettings,
}; };
use crate::{print_sorted, Line};
use tempfile::TempDir; use tempfile::TempDir;
const START_BUFFER_SIZE: usize = 8_000; const START_BUFFER_SIZE: usize = 8_000;
@ -98,16 +99,39 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
merger.write_all(settings); merger.write_all(settings);
} }
ReadResult::SortedSingleChunk(chunk) => { ReadResult::SortedSingleChunk(chunk) => {
output_sorted_lines(chunk.borrow_lines().iter(), settings); if settings.unique {
print_sorted(
chunk.lines().iter().dedup_by(|a, b| {
compare_by(a, b, settings, chunk.line_data(), chunk.line_data())
== Ordering::Equal
}),
settings,
);
} else {
print_sorted(chunk.lines().iter(), settings);
}
} }
ReadResult::SortedTwoChunks([a, b]) => { ReadResult::SortedTwoChunks([a, b]) => {
let merged_iter = a let merged_iter = a.lines().iter().map(|line| (line, &a)).merge_by(
.borrow_lines() b.lines().iter().map(|line| (line, &b)),
.iter() |(line_a, a), (line_b, b)| {
.merge_by(b.borrow_lines().iter(), |line_a, line_b| { compare_by(line_a, line_b, settings, a.line_data(), b.line_data())
compare_by(line_a, line_b, settings) != Ordering::Greater != Ordering::Greater
}); },
output_sorted_lines(merged_iter, settings); );
if settings.unique {
print_sorted(
merged_iter
.dedup_by(|(line_a, a), (line_b, b)| {
compare_by(line_a, line_b, settings, a.line_data(), b.line_data())
== Ordering::Equal
})
.map(|(line, _)| line),
settings,
);
} else {
print_sorted(merged_iter.map(|(line, _)| line), settings);
}
} }
ReadResult::EmptyInput => { ReadResult::EmptyInput => {
// don't output anything // don't output anything
@ -118,7 +142,9 @@ fn reader_writer<F: Iterator<Item = Box<dyn Read + Send>>, Tmp: WriteableTmpFile
/// The function that is executed on the sorter thread. /// The function that is executed on the sorter thread.
fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: GlobalSettings) { fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: GlobalSettings) {
while let Ok(mut payload) = receiver.recv() { while let Ok(mut payload) = receiver.recv() {
payload.with_lines_mut(|lines| sort_by(lines, &settings)); payload.with_contents_mut(|contents| {
sort_by(&mut contents.lines, &settings, &contents.line_data)
});
sender.send(payload).unwrap(); sender.send(payload).unwrap();
} }
} }
@ -154,20 +180,16 @@ fn read_write_loop<I: WriteableTmpFile>(
for _ in 0..2 { for _ in 0..2 {
let should_continue = chunks::read( let should_continue = chunks::read(
&sender, &sender,
vec![ RecycledChunk::new(if START_BUFFER_SIZE < buffer_size {
0; START_BUFFER_SIZE
if START_BUFFER_SIZE < buffer_size { } else {
START_BUFFER_SIZE buffer_size
} else { }),
buffer_size
}
],
Some(buffer_size), Some(buffer_size),
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut files, &mut files,
separator, separator,
Vec::new(),
settings, settings,
); );
@ -216,18 +238,17 @@ fn read_write_loop<I: WriteableTmpFile>(
file_number += 1; file_number += 1;
let (recycled_lines, recycled_buffer) = chunk.recycle(); let recycled_chunk = chunk.recycle();
if let Some(sender) = &sender_option { if let Some(sender) = &sender_option {
let should_continue = chunks::read( let should_continue = chunks::read(
sender, sender,
recycled_buffer, recycled_chunk,
None, None,
&mut carry_over, &mut carry_over,
&mut file, &mut file,
&mut files, &mut files,
separator, separator,
recycled_lines,
settings, settings,
); );
if !should_continue { if !should_continue {
@ -245,12 +266,9 @@ fn write<I: WriteableTmpFile>(
compress_prog: Option<&str>, compress_prog: Option<&str>,
separator: u8, separator: u8,
) -> I::Closed { ) -> I::Closed {
chunk.with_lines_mut(|lines| { let mut tmp_file = I::create(file, compress_prog);
// Write the lines to the file write_lines(chunk.lines(), tmp_file.as_write(), separator);
let mut tmp_file = I::create(file, compress_prog); tmp_file.finished_writing()
write_lines(lines, tmp_file.as_write(), separator);
tmp_file.finished_writing()
})
} }
fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) { fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) {

View file

@ -24,7 +24,7 @@ use itertools::Itertools;
use tempfile::TempDir; use tempfile::TempDir;
use crate::{ use crate::{
chunks::{self, Chunk}, chunks::{self, Chunk, RecycledChunk},
compare_by, GlobalSettings, compare_by, GlobalSettings,
}; };
@ -125,14 +125,14 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = M>>(
})); }));
// Send the initial chunk to trigger a read for each file // Send the initial chunk to trigger a read for each file
request_sender request_sender
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) .send((file_number, RecycledChunk::new(8 * 1024)))
.unwrap(); .unwrap();
} }
// Send the second chunk for each file // Send the second chunk for each file
for file_number in 0..reader_files.len() { for file_number in 0..reader_files.len() {
request_sender request_sender
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new()))) .send((file_number, RecycledChunk::new(8 * 1024)))
.unwrap(); .unwrap();
} }
@ -181,13 +181,12 @@ struct ReaderFile<M: MergeInput> {
/// The function running on the reader thread. /// The function running on the reader thread.
fn reader( fn reader(
recycled_receiver: Receiver<(usize, Chunk)>, recycled_receiver: Receiver<(usize, RecycledChunk)>,
files: &mut [Option<ReaderFile<impl MergeInput>>], files: &mut [Option<ReaderFile<impl MergeInput>>],
settings: &GlobalSettings, settings: &GlobalSettings,
separator: u8, separator: u8,
) { ) {
for (file_idx, chunk) in recycled_receiver.iter() { for (file_idx, recycled_chunk) in recycled_receiver.iter() {
let (recycled_lines, recycled_buffer) = chunk.recycle();
if let Some(ReaderFile { if let Some(ReaderFile {
file, file,
sender, sender,
@ -196,13 +195,12 @@ fn reader(
{ {
let should_continue = chunks::read( let should_continue = chunks::read(
sender, sender,
recycled_buffer, recycled_chunk,
None, None,
carry_over, carry_over,
file.as_read(), file.as_read(),
&mut iter::empty(), &mut iter::empty(),
separator, separator,
recycled_lines,
settings, settings,
); );
if !should_continue { if !should_continue {
@ -234,7 +232,7 @@ struct PreviousLine {
/// Merges files together. This is **not** an iterator because of lifetime problems. /// Merges files together. This is **not** an iterator because of lifetime problems.
pub struct FileMerger<'a> { pub struct FileMerger<'a> {
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>, heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
request_sender: Sender<(usize, Chunk)>, request_sender: Sender<(usize, RecycledChunk)>,
prev: Option<PreviousLine>, prev: Option<PreviousLine>,
} }
@ -257,14 +255,16 @@ impl<'a> FileMerger<'a> {
file_number: file.file_number, file_number: file.file_number,
}); });
file.current_chunk.with_lines(|lines| { file.current_chunk.with_contents(|contents| {
let current_line = &lines[file.line_idx]; let current_line = &contents.lines[file.line_idx];
if settings.unique { if settings.unique {
if let Some(prev) = &prev { if let Some(prev) = &prev {
let cmp = compare_by( let cmp = compare_by(
&prev.chunk.borrow_lines()[prev.line_idx], &prev.chunk.lines()[prev.line_idx],
current_line, current_line,
settings, settings,
prev.chunk.line_data(),
file.current_chunk.line_data(),
); );
if cmp == Ordering::Equal { if cmp == Ordering::Equal {
return; return;
@ -274,8 +274,7 @@ impl<'a> FileMerger<'a> {
current_line.print(out, settings); current_line.print(out, settings);
}); });
let was_last_line_for_file = let was_last_line_for_file = file.current_chunk.lines().len() == file.line_idx + 1;
file.current_chunk.borrow_lines().len() == file.line_idx + 1;
if was_last_line_for_file { if was_last_line_for_file {
if let Ok(next_chunk) = file.receiver.recv() { if let Ok(next_chunk) = file.receiver.recv() {
@ -295,7 +294,7 @@ impl<'a> FileMerger<'a> {
// If nothing is referencing the previous chunk anymore, this means that the previous line // If nothing is referencing the previous chunk anymore, this means that the previous line
// was the last line of the chunk. We can recycle the chunk. // was the last line of the chunk. We can recycle the chunk.
self.request_sender self.request_sender
.send((prev.file_number, prev_chunk)) .send((prev.file_number, prev_chunk.recycle()))
.ok(); .ok();
} }
} }
@ -312,9 +311,11 @@ struct FileComparator<'a> {
impl<'a> Compare<MergeableFile> for FileComparator<'a> { impl<'a> Compare<MergeableFile> for FileComparator<'a> {
fn compare(&self, a: &MergeableFile, b: &MergeableFile) -> Ordering { fn compare(&self, a: &MergeableFile, b: &MergeableFile) -> Ordering {
let mut cmp = compare_by( let mut cmp = compare_by(
&a.current_chunk.borrow_lines()[a.line_idx], &a.current_chunk.lines()[a.line_idx],
&b.current_chunk.borrow_lines()[b.line_idx], &b.current_chunk.lines()[b.line_idx],
self.settings, self.settings,
a.current_chunk.line_data(),
b.current_chunk.line_data(),
); );
if cmp == Ordering::Equal { if cmp == Ordering::Equal {
// To make sorting stable, we need to consider the file number as well, // To make sorting stable, we need to consider the file number as well,

View file

@ -23,11 +23,11 @@ mod ext_sort;
mod merge; mod merge;
mod numeric_str_cmp; mod numeric_str_cmp;
use chunks::LineData;
use clap::{crate_version, App, Arg}; use clap::{crate_version, App, Arg};
use custom_str_cmp::custom_str_cmp; use custom_str_cmp::custom_str_cmp;
use ext_sort::ext_sort; use ext_sort::ext_sort;
use fnv::FnvHasher; use fnv::FnvHasher;
use itertools::Itertools;
use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings}; use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings};
use rand::distributions::Alphanumeric; use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@ -170,6 +170,17 @@ pub struct GlobalSettings {
tmp_dir: PathBuf, tmp_dir: PathBuf,
compress_prog: Option<String>, compress_prog: Option<String>,
merge_batch_size: usize, merge_batch_size: usize,
precomputed: Precomputed,
}
/// Data needed for sorting. Should be computed once before starting to sort
/// by calling `GlobalSettings::init_precomputed`.
#[derive(Clone, Debug)]
struct Precomputed {
needs_tokens: bool,
num_infos_per_line: usize,
floats_per_line: usize,
selections_per_line: usize,
} }
impl GlobalSettings { impl GlobalSettings {
@ -210,6 +221,25 @@ impl GlobalSettings {
None => BufWriter::new(Box::new(stdout()) as Box<dyn Write>), None => BufWriter::new(Box::new(stdout()) as Box<dyn Write>),
} }
} }
/// Precompute some data needed for sorting.
/// This function **must** be called before starting to sort, and `GlobalSettings` may not be altered
/// afterwards.
fn init_precomputed(&mut self) {
self.precomputed.needs_tokens = self.selectors.iter().any(|s| s.needs_tokens);
self.precomputed.selections_per_line =
self.selectors.iter().filter(|s| s.needs_selection).count();
self.precomputed.num_infos_per_line = self
.selectors
.iter()
.filter(|s| matches!(s.settings.mode, SortMode::Numeric | SortMode::HumanNumeric))
.count();
self.precomputed.floats_per_line = self
.selectors
.iter()
.filter(|s| matches!(s.settings.mode, SortMode::GeneralNumeric))
.count();
}
} }
impl Default for GlobalSettings { impl Default for GlobalSettings {
@ -237,9 +267,16 @@ impl Default for GlobalSettings {
tmp_dir: PathBuf::new(), tmp_dir: PathBuf::new(),
compress_prog: None, compress_prog: None,
merge_batch_size: 32, merge_batch_size: 32,
precomputed: Precomputed {
num_infos_per_line: 0,
floats_per_line: 0,
selections_per_line: 0,
needs_tokens: false,
},
} }
} }
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
struct KeySettings { struct KeySettings {
mode: SortMode, mode: SortMode,
@ -322,32 +359,10 @@ impl Default for KeySettings {
Self::from(&GlobalSettings::default()) Self::from(&GlobalSettings::default())
} }
} }
enum Selection<'a> {
#[derive(Clone, Debug)]
enum NumCache {
AsF64(GeneralF64ParseResult), AsF64(GeneralF64ParseResult),
WithInfo(NumInfo), WithNumInfo(&'a str, NumInfo),
} Str(&'a str),
impl NumCache {
fn as_f64(&self) -> GeneralF64ParseResult {
match self {
NumCache::AsF64(n) => *n,
_ => unreachable!(),
}
}
fn as_num_info(&self) -> &NumInfo {
match self {
NumCache::WithInfo(n) => n,
_ => unreachable!(),
}
}
}
#[derive(Clone, Debug)]
struct Selection<'a> {
slice: &'a str,
num_cache: Option<Box<NumCache>>,
} }
type Field = Range<usize>; type Field = Range<usize>;
@ -355,31 +370,44 @@ type Field = Range<usize>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Line<'a> { pub struct Line<'a> {
line: &'a str, line: &'a str,
selections: Box<[Selection<'a>]>, index: usize,
} }
impl<'a> Line<'a> { impl<'a> Line<'a> {
fn create(string: &'a str, settings: &GlobalSettings) -> Self { /// Creates a new `Line`.
let fields = if settings ///
/// If additional data is needed for sorting it is added to `line_data`.
/// `token_buffer` allows to reuse the allocation for tokens.
fn create(
line: &'a str,
index: usize,
line_data: &mut LineData<'a>,
token_buffer: &mut Vec<Field>,
settings: &GlobalSettings,
) -> Self {
token_buffer.clear();
if settings.precomputed.needs_tokens {
tokenize(line, settings.separator, token_buffer);
}
for (selector, selection) in settings
.selectors .selectors
.iter() .iter()
.any(|selector| selector.needs_tokens) .map(|selector| (selector, selector.get_selection(line, token_buffer)))
{ {
// Only tokenize if we will need tokens. match selection {
Some(tokenize(string, settings.separator)) Selection::AsF64(parsed_float) => line_data.parsed_floats.push(parsed_float),
} else { Selection::WithNumInfo(str, num_info) => {
None line_data.num_infos.push(num_info);
}; line_data.selections.push(str);
}
Line { Selection::Str(str) => {
line: string, if selector.needs_selection {
selections: settings line_data.selections.push(str)
.selectors }
.iter() }
.filter(|selector| !selector.is_default_selection) }
.map(|selector| selector.get_selection(string, fields.as_deref()))
.collect(),
} }
Self { line, index }
} }
fn print(&self, writer: &mut impl Write, settings: &GlobalSettings) { fn print(&self, writer: &mut impl Write, settings: &GlobalSettings) {
@ -408,7 +436,8 @@ impl<'a> Line<'a> {
let line = self.line.replace('\t', ">"); let line = self.line.replace('\t', ">");
writeln!(writer, "{}", line)?; writeln!(writer, "{}", line)?;
let fields = tokenize(self.line, settings.separator); let mut fields = vec![];
tokenize(self.line, settings.separator, &mut fields);
for selector in settings.selectors.iter() { for selector in settings.selectors.iter() {
let mut selection = selector.get_range(self.line, Some(&fields)); let mut selection = selector.get_range(self.line, Some(&fields));
match selector.settings.mode { match selector.settings.mode {
@ -539,51 +568,51 @@ impl<'a> Line<'a> {
} }
} }
/// Tokenize a line into fields. /// Tokenize a line into fields. The result is stored into `token_buffer`.
fn tokenize(line: &str, separator: Option<char>) -> Vec<Field> { fn tokenize(line: &str, separator: Option<char>, token_buffer: &mut Vec<Field>) {
assert!(token_buffer.is_empty());
if let Some(separator) = separator { if let Some(separator) = separator {
tokenize_with_separator(line, separator) tokenize_with_separator(line, separator, token_buffer)
} else { } else {
tokenize_default(line) tokenize_default(line, token_buffer)
} }
} }
/// By default fields are separated by the first whitespace after non-whitespace. /// By default fields are separated by the first whitespace after non-whitespace.
/// Whitespace is included in fields at the start. /// Whitespace is included in fields at the start.
fn tokenize_default(line: &str) -> Vec<Field> { /// The result is stored into `token_buffer`.
let mut tokens = vec![0..0]; fn tokenize_default(line: &str, token_buffer: &mut Vec<Field>) {
token_buffer.push(0..0);
// pretend that there was whitespace in front of the line // pretend that there was whitespace in front of the line
let mut previous_was_whitespace = true; let mut previous_was_whitespace = true;
for (idx, char) in line.char_indices() { for (idx, char) in line.char_indices() {
if char.is_whitespace() { if char.is_whitespace() {
if !previous_was_whitespace { if !previous_was_whitespace {
tokens.last_mut().unwrap().end = idx; token_buffer.last_mut().unwrap().end = idx;
tokens.push(idx..0); token_buffer.push(idx..0);
} }
previous_was_whitespace = true; previous_was_whitespace = true;
} else { } else {
previous_was_whitespace = false; previous_was_whitespace = false;
} }
} }
tokens.last_mut().unwrap().end = line.len(); token_buffer.last_mut().unwrap().end = line.len();
tokens
} }
/// Split between separators. These separators are not included in fields. /// Split between separators. These separators are not included in fields.
fn tokenize_with_separator(line: &str, separator: char) -> Vec<Field> { /// The result is stored into `token_buffer`.
let mut tokens = vec![]; fn tokenize_with_separator(line: &str, separator: char, token_buffer: &mut Vec<Field>) {
let separator_indices = let separator_indices =
line.char_indices() line.char_indices()
.filter_map(|(i, c)| if c == separator { Some(i) } else { None }); .filter_map(|(i, c)| if c == separator { Some(i) } else { None });
let mut start = 0; let mut start = 0;
for sep_idx in separator_indices { for sep_idx in separator_indices {
tokens.push(start..sep_idx); token_buffer.push(start..sep_idx);
start = sep_idx + 1; start = sep_idx + 1;
} }
if start < line.len() { if start < line.len() {
tokens.push(start..line.len()); token_buffer.push(start..line.len());
} }
tokens
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
@ -640,8 +669,10 @@ struct FieldSelector {
to: Option<KeyPosition>, to: Option<KeyPosition>,
settings: KeySettings, settings: KeySettings,
needs_tokens: bool, needs_tokens: bool,
// Whether the selection for each line is going to be the whole line with no NumCache // Whether this selector operates on a sub-slice of a line.
is_default_selection: bool, // Selections are therefore not needed when this selector matches the whole line
// or the sort mode is general-numeric.
needs_selection: bool,
} }
impl Default for FieldSelector { impl Default for FieldSelector {
@ -651,7 +682,7 @@ impl Default for FieldSelector {
to: None, to: None,
settings: Default::default(), settings: Default::default(),
needs_tokens: false, needs_tokens: false,
is_default_selection: true, needs_selection: false,
} }
} }
} }
@ -747,14 +778,12 @@ impl FieldSelector {
Err("invalid character index 0 for the start position of a field".to_string()) Err("invalid character index 0 for the start position of a field".to_string())
} else { } else {
Ok(Self { Ok(Self {
is_default_selection: from.field == 1 needs_selection: (from.field != 1
&& from.char == 1 || from.char != 1
&& to.is_none() || to.is_some()
&& !matches!( || matches!(settings.mode, SortMode::Numeric | SortMode::HumanNumeric)
settings.mode, || from.ignore_blanks)
SortMode::Numeric | SortMode::GeneralNumeric | SortMode::HumanNumeric && !matches!(settings.mode, SortMode::GeneralNumeric),
)
&& !from.ignore_blanks,
needs_tokens: from.field != 1 || from.char == 0 || to.is_some(), needs_tokens: from.field != 1 || from.char == 0 || to.is_some(),
from, from,
to, to,
@ -764,12 +793,16 @@ impl FieldSelector {
} }
/// Get the selection that corresponds to this selector for the line. /// Get the selection that corresponds to this selector for the line.
/// If needs_fields returned false, tokens may be None. /// If needs_fields returned false, tokens may be empty.
fn get_selection<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Selection<'a> { fn get_selection<'a>(&self, line: &'a str, tokens: &[Field]) -> Selection<'a> {
// `get_range` expects `None` when we don't need tokens and would get confused by an empty vector.
let tokens = if self.needs_tokens {
Some(tokens)
} else {
None
};
let mut range = &line[self.get_range(line, tokens)]; let mut range = &line[self.get_range(line, tokens)];
let num_cache = if self.settings.mode == SortMode::Numeric if self.settings.mode == SortMode::Numeric || self.settings.mode == SortMode::HumanNumeric {
|| self.settings.mode == SortMode::HumanNumeric
{
// Parse NumInfo for this number. // Parse NumInfo for this number.
let (info, num_range) = NumInfo::parse( let (info, num_range) = NumInfo::parse(
range, range,
@ -780,24 +813,18 @@ impl FieldSelector {
); );
// Shorten the range to what we need to pass to numeric_str_cmp later. // Shorten the range to what we need to pass to numeric_str_cmp later.
range = &range[num_range]; range = &range[num_range];
Some(Box::new(NumCache::WithInfo(info))) Selection::WithNumInfo(range, info)
} else if self.settings.mode == SortMode::GeneralNumeric { } else if self.settings.mode == SortMode::GeneralNumeric {
// Parse this number as f64, as this is the requirement for general numeric sorting. // Parse this number as f64, as this is the requirement for general numeric sorting.
Some(Box::new(NumCache::AsF64(general_f64_parse( Selection::AsF64(general_f64_parse(&range[get_leading_gen(range)]))
&range[get_leading_gen(range)],
))))
} else { } else {
// This is not a numeric sort, so we don't need a NumCache. // This is not a numeric sort, so we don't need a NumCache.
None Selection::Str(range)
};
Selection {
slice: range,
num_cache,
} }
} }
/// Look up the range in the line that corresponds to this selector. /// Look up the range in the line that corresponds to this selector.
/// If needs_fields returned false, tokens may be None. /// If needs_fields returned false, tokens must be None.
fn get_range<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Range<usize> { fn get_range<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Range<usize> {
enum Resolution { enum Resolution {
// The start index of the resolved character, inclusive // The start index of the resolved character, inclusive
@ -1297,18 +1324,9 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
); );
} }
exec(&files, &settings) settings.init_precomputed();
}
fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &GlobalSettings) { exec(&files, &settings)
if settings.unique {
print_sorted(
iter.dedup_by(|a, b| compare_by(a, b, settings) == Ordering::Equal),
settings,
);
} else {
print_sorted(iter, settings);
}
} }
fn exec(files: &[String], settings: &GlobalSettings) -> i32 { fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
@ -1328,55 +1346,59 @@ fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
0 0
} }
fn sort_by<'a>(unsorted: &mut Vec<Line<'a>>, settings: &GlobalSettings) { fn sort_by<'a>(unsorted: &mut Vec<Line<'a>>, settings: &GlobalSettings, line_data: &LineData<'a>) {
if settings.stable || settings.unique { if settings.stable || settings.unique {
unsorted.par_sort_by(|a, b| compare_by(a, b, settings)) unsorted.par_sort_by(|a, b| compare_by(a, b, settings, line_data, line_data))
} else { } else {
unsorted.par_sort_unstable_by(|a, b| compare_by(a, b, settings)) unsorted.par_sort_unstable_by(|a, b| compare_by(a, b, settings, line_data, line_data))
} }
} }
fn compare_by<'a>(a: &Line<'a>, b: &Line<'a>, global_settings: &GlobalSettings) -> Ordering { fn compare_by<'a>(
let mut idx = 0; a: &Line<'a>,
b: &Line<'a>,
global_settings: &GlobalSettings,
a_line_data: &LineData<'a>,
b_line_data: &LineData<'a>,
) -> Ordering {
let mut selection_index = 0;
let mut num_info_index = 0;
let mut parsed_float_index = 0;
for selector in &global_settings.selectors { for selector in &global_settings.selectors {
let mut _selections = None; let (a_str, b_str) = if !selector.needs_selection {
let (a_selection, b_selection) = if selector.is_default_selection {
// We can select the whole line. // We can select the whole line.
// We have to store the selections outside of the if-block so that they live long enough. (a.line, b.line)
_selections = Some((
Selection {
slice: a.line,
num_cache: None,
},
Selection {
slice: b.line,
num_cache: None,
},
));
// Unwrap the selections again, and return references to them.
(
&_selections.as_ref().unwrap().0,
&_selections.as_ref().unwrap().1,
)
} else { } else {
let selections = (&a.selections[idx], &b.selections[idx]); let selections = (
idx += 1; a_line_data.selections
[a.index * global_settings.precomputed.selections_per_line + selection_index],
b_line_data.selections
[b.index * global_settings.precomputed.selections_per_line + selection_index],
);
selection_index += 1;
selections selections
}; };
let a_str = a_selection.slice;
let b_str = b_selection.slice;
let settings = &selector.settings; let settings = &selector.settings;
let cmp: Ordering = match settings.mode { let cmp: Ordering = match settings.mode {
SortMode::Random => random_shuffle(a_str, b_str, &global_settings.salt), SortMode::Random => random_shuffle(a_str, b_str, &global_settings.salt),
SortMode::Numeric | SortMode::HumanNumeric => numeric_str_cmp( SortMode::Numeric | SortMode::HumanNumeric => {
(a_str, a_selection.num_cache.as_ref().unwrap().as_num_info()), let a_num_info = &a_line_data.num_infos
(b_str, b_selection.num_cache.as_ref().unwrap().as_num_info()), [a.index * global_settings.precomputed.num_infos_per_line + num_info_index];
), let b_num_info = &b_line_data.num_infos
SortMode::GeneralNumeric => general_numeric_compare( [b.index * global_settings.precomputed.num_infos_per_line + num_info_index];
a_selection.num_cache.as_ref().unwrap().as_f64(), num_info_index += 1;
b_selection.num_cache.as_ref().unwrap().as_f64(), numeric_str_cmp((a_str, a_num_info), (b_str, b_num_info))
), }
SortMode::GeneralNumeric => {
let a_float = &a_line_data.parsed_floats
[a.index * global_settings.precomputed.floats_per_line + parsed_float_index];
let b_float = &b_line_data.parsed_floats
[b.index * global_settings.precomputed.floats_per_line + parsed_float_index];
parsed_float_index += 1;
general_numeric_compare(a_float, b_float)
}
SortMode::Month => month_compare(a_str, b_str), SortMode::Month => month_compare(a_str, b_str),
SortMode::Version => version_compare(a_str, b_str), SortMode::Version => version_compare(a_str, b_str),
SortMode::Default => custom_str_cmp( SortMode::Default => custom_str_cmp(
@ -1470,7 +1492,7 @@ fn get_leading_gen(input: &str) -> Range<usize> {
} }
#[derive(Copy, Clone, PartialEq, PartialOrd, Debug)] #[derive(Copy, Clone, PartialEq, PartialOrd, Debug)]
enum GeneralF64ParseResult { pub enum GeneralF64ParseResult {
Invalid, Invalid,
NaN, NaN,
NegInfinity, NegInfinity,
@ -1497,8 +1519,8 @@ fn general_f64_parse(a: &str) -> GeneralF64ParseResult {
/// Compares two floats, with errors and non-numerics assumed to be -inf. /// Compares two floats, with errors and non-numerics assumed to be -inf.
/// Stops coercing at the first non-numeric char. /// Stops coercing at the first non-numeric char.
/// We explicitly need to convert to f64 in this case. /// We explicitly need to convert to f64 in this case.
fn general_numeric_compare(a: GeneralF64ParseResult, b: GeneralF64ParseResult) -> Ordering { fn general_numeric_compare(a: &GeneralF64ParseResult, b: &GeneralF64ParseResult) -> Ordering {
a.partial_cmp(&b).unwrap() a.partial_cmp(b).unwrap()
} }
fn get_rand_string() -> String { fn get_rand_string() -> String {
@ -1646,6 +1668,12 @@ mod tests {
use super::*; use super::*;
fn tokenize_helper(line: &str, separator: Option<char>) -> Vec<Field> {
let mut buffer = vec![];
tokenize(line, separator, &mut buffer);
buffer
}
#[test] #[test]
fn test_get_hash() { fn test_get_hash() {
let a = "Ted".to_string(); let a = "Ted".to_string();
@ -1689,20 +1717,23 @@ mod tests {
#[test] #[test]
fn test_tokenize_fields() { fn test_tokenize_fields() {
let line = "foo bar b x"; let line = "foo bar b x";
assert_eq!(tokenize(line, None), vec![0..3, 3..7, 7..9, 9..14,],); assert_eq!(tokenize_helper(line, None), vec![0..3, 3..7, 7..9, 9..14,],);
} }
#[test] #[test]
fn test_tokenize_fields_leading_whitespace() { fn test_tokenize_fields_leading_whitespace() {
let line = " foo bar b x"; let line = " foo bar b x";
assert_eq!(tokenize(line, None), vec![0..7, 7..11, 11..13, 13..18,]); assert_eq!(
tokenize_helper(line, None),
vec![0..7, 7..11, 11..13, 13..18,]
);
} }
#[test] #[test]
fn test_tokenize_fields_custom_separator() { fn test_tokenize_fields_custom_separator() {
let line = "aaa foo bar b x"; let line = "aaa foo bar b x";
assert_eq!( assert_eq!(
tokenize(line, Some('a')), tokenize_helper(line, Some('a')),
vec![0..0, 1..1, 2..2, 3..9, 10..18,] vec![0..0, 1..1, 2..2, 3..9, 10..18,]
); );
} }
@ -1710,11 +1741,11 @@ mod tests {
#[test] #[test]
fn test_tokenize_fields_trailing_custom_separator() { fn test_tokenize_fields_trailing_custom_separator() {
let line = "a"; let line = "a";
assert_eq!(tokenize(line, Some('a')), vec![0..0]); assert_eq!(tokenize_helper(line, Some('a')), vec![0..0]);
let line = "aa"; let line = "aa";
assert_eq!(tokenize(line, Some('a')), vec![0..0, 1..1]); assert_eq!(tokenize_helper(line, Some('a')), vec![0..0, 1..1]);
let line = "..a..a"; let line = "..a..a";
assert_eq!(tokenize(line, Some('a')), vec![0..2, 3..5]); assert_eq!(tokenize_helper(line, Some('a')), vec![0..2, 3..5]);
} }
#[test] #[test]
@ -1722,13 +1753,7 @@ mod tests {
fn test_line_size() { fn test_line_size() {
// We should make sure to not regress the size of the Line struct because // We should make sure to not regress the size of the Line struct because
// it is unconditional overhead for every line we sort. // it is unconditional overhead for every line we sort.
assert_eq!(std::mem::size_of::<Line>(), 32); assert_eq!(std::mem::size_of::<Line>(), 24);
// These are the fields of Line:
assert_eq!(std::mem::size_of::<&str>(), 16);
assert_eq!(std::mem::size_of::<Box<[Selection]>>(), 16);
// How big is a selection? Constant cost all lines pay when we need selections.
assert_eq!(std::mem::size_of::<Selection>(), 24);
} }
#[test] #[test]