1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 11:37:44 +00:00

Merge pull request #2130 from electricboogie/master

sort: implement --buffer-size and --temporary-directory (external sort)
This commit is contained in:
Sylvestre Ledru 2021-04-28 09:21:14 +02:00 committed by GitHub
commit a37e3181a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 40489 additions and 18 deletions

9
Cargo.lock generated
View file

@ -1353,6 +1353,9 @@ name = "serde"
version = "1.0.125" version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171" checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
dependencies = [
"serde_derive",
]
[[package]] [[package]]
name = "serde_cbor" name = "serde_cbor"
@ -1431,6 +1434,9 @@ name = "smallvec"
version = "1.6.1" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "strsim" name = "strsim"
@ -2363,7 +2369,10 @@ dependencies = [
"rand 0.7.3", "rand 0.7.3",
"rayon", "rayon",
"semver", "semver",
"serde",
"serde_json",
"smallvec 1.6.1", "smallvec 1.6.1",
"tempdir",
"unicode-width", "unicode-width",
"uucore", "uucore",
"uucore_procs", "uucore_procs",

View file

@ -15,16 +15,19 @@ edition = "2018"
path = "src/sort.rs" path = "src/sort.rs"
[dependencies] [dependencies]
serde_json = { version = "1.0.64", default-features = false, features = ["alloc"] }
serde = { version = "1.0", features = ["derive"] }
rayon = "1.5" rayon = "1.5"
rand = "0.7" rand = "0.7"
clap = "2.33" clap = "2.33"
fnv = "1.0.7" fnv = "1.0.7"
itertools = "0.10.0" itertools = "0.10.0"
semver = "0.9.0" semver = "0.9.0"
smallvec = "1.6.1" smallvec = { version="1.6.1", features=["serde"] }
unicode-width = "0.1.8" unicode-width = "0.1.8"
uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] } uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] }
uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" } uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" }
tempdir = "0.3.7"
[[bin]] [[bin]]
name = "sort" name = "sort"

View file

@ -0,0 +1,19 @@
Copyright 2018 Battelle Memorial Institute
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,295 @@
use std::clone::Clone;
use std::cmp::Ordering::Less;
use std::collections::VecDeque;
use std::error::Error;
use std::fs::{File, OpenOptions};
use std::io::SeekFrom::Start;
use std::io::{BufRead, BufReader, BufWriter, Seek, Write};
use std::marker::PhantomData;
use std::path::PathBuf;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json;
use tempdir::TempDir;
use super::{GlobalSettings, Line};
/// Trait for types that can be used by
/// [ExternalSorter](struct.ExternalSorter.html). Must be sortable, cloneable,
/// serializeable, and able to report on it's size
pub trait ExternallySortable: Clone + Serialize + DeserializeOwned {
/// Get the size, in bytes, of this object (used to constrain the buffer
/// used in the external sort).
fn get_size(&self) -> u64;
}
/// Iterator that provides sorted `T`s
pub struct ExtSortedIterator<Line> {
buffers: Vec<VecDeque<Line>>,
chunk_offsets: Vec<u64>,
max_per_chunk: u64,
chunks: u64,
tmp_dir: TempDir,
settings: GlobalSettings,
failed: bool,
}
impl Iterator for ExtSortedIterator<Line>
where
Line: ExternallySortable,
{
type Item = Result<Line, Box<dyn Error>>;
/// # Errors
///
/// This method can fail due to issues reading intermediate sorted chunks
/// from disk, or due to serde deserialization issues
fn next(&mut self) -> Option<Self::Item> {
if self.failed {
return None;
}
// fill up any empty buffers
let mut empty = true;
for chunk_num in 0..self.chunks {
if self.buffers[chunk_num as usize].is_empty() {
let mut f = match File::open(self.tmp_dir.path().join(chunk_num.to_string())) {
Ok(f) => f,
Err(e) => {
self.failed = true;
return Some(Err(Box::new(e)));
}
};
match f.seek(Start(self.chunk_offsets[chunk_num as usize])) {
Ok(_) => (),
Err(e) => {
self.failed = true;
return Some(Err(Box::new(e)));
}
}
let bytes_read =
match fill_buff(&mut self.buffers[chunk_num as usize], f, self.max_per_chunk) {
Ok(bytes_read) => bytes_read,
Err(e) => {
self.failed = true;
return Some(Err(e));
}
};
self.chunk_offsets[chunk_num as usize] += bytes_read;
if !self.buffers[chunk_num as usize].is_empty() {
empty = false;
}
} else {
empty = false;
}
}
if empty {
return None;
}
// find the next record to write
// check is_empty() before unwrap()ing
let mut idx = 0;
for chunk_num in 0..self.chunks as usize {
if !self.buffers[chunk_num].is_empty() {
if self.buffers[idx].is_empty()
|| (super::compare_by)(
self.buffers[chunk_num].front().unwrap(),
self.buffers[idx].front().unwrap(),
&self.settings,
) == Less
{
idx = chunk_num;
}
}
}
// unwrap due to checks above
let r = self.buffers[idx].pop_front().unwrap();
Some(Ok(r))
}
}
/// Perform an external sort on an unsorted stream of incoming data
pub struct ExternalSorter<Line>
where
Line: ExternallySortable,
{
tmp_dir: Option<PathBuf>,
buffer_bytes: u64,
phantom: PhantomData<Line>,
settings: GlobalSettings,
}
impl ExternalSorter<Line>
where
Line: ExternallySortable,
{
/// Create a new `ExternalSorter` with a specified memory buffer and
/// temporary directory
pub fn new(
buffer_bytes: u64,
tmp_dir: Option<PathBuf>,
settings: GlobalSettings,
) -> ExternalSorter<Line> {
ExternalSorter {
buffer_bytes,
tmp_dir,
phantom: PhantomData,
settings,
}
}
/// Sort (based on `compare`) the `T`s provided by `unsorted` and return an
/// iterator
///
/// # Errors
///
/// This method can fail due to issues writing intermediate sorted chunks
/// to disk, or due to serde serialization issues
pub fn sort_by<I>(
&self,
unsorted: I,
settings: GlobalSettings,
) -> Result<ExtSortedIterator<Line>, Box<dyn Error>>
where
I: Iterator<Item = Line>,
{
let tmp_dir = match self.tmp_dir {
Some(ref p) => TempDir::new_in(p, "uutils_sort")?,
None => TempDir::new("uutils_sort")?,
};
// creating the thing we need to return first due to the face that we need to
// borrow tmp_dir and move it out
let mut iter = ExtSortedIterator {
buffers: Vec::new(),
chunk_offsets: Vec::new(),
max_per_chunk: 0,
chunks: 0,
tmp_dir,
settings,
failed: false,
};
{
let mut total_read = 0;
let mut chunk = Vec::new();
// Initial buffer is specified by user
let mut adjusted_buffer_size = self.buffer_bytes;
let (iter_size, _) = unsorted.size_hint();
// make the initial chunks on disk
for seq in unsorted {
let seq_size = seq.get_size();
total_read += seq_size;
// GNU minimum is 16 * (sizeof struct + 2), but GNU uses about
// 1/10 the memory that we do. And GNU even says in the code it may
// not work on small buffer sizes.
//
// The following seems to work pretty well, and has about the same max
// RSS as lower minimum values.
//
let minimum_buffer_size: u64 = iter_size as u64 * seq_size / 8;
adjusted_buffer_size =
// Grow buffer size for a struct/Line larger than buffer
if adjusted_buffer_size < seq_size {
seq_size
} else if adjusted_buffer_size < minimum_buffer_size {
minimum_buffer_size
} else {
adjusted_buffer_size
};
chunk.push(seq);
if total_read >= adjusted_buffer_size {
super::sort_by(&mut chunk, &self.settings);
self.write_chunk(
&iter.tmp_dir.path().join(iter.chunks.to_string()),
&mut chunk,
)?;
chunk.clear();
total_read = 0;
iter.chunks += 1;
}
}
// write the last chunk
if chunk.len() > 0 {
super::sort_by(&mut chunk, &self.settings);
self.write_chunk(
&iter.tmp_dir.path().join(iter.chunks.to_string()),
&mut chunk,
)?;
iter.chunks += 1;
}
// initialize buffers for each chunk
//
// Having a right sized buffer for each chunk for smallish values seems silly to me?
//
// We will have to have the entire iter in memory sometime right?
// Set minimum to the size of the writer buffer, ~8K
//
const MINIMUM_READBACK_BUFFER: u64 = 8200;
let right_sized_buffer = adjusted_buffer_size
.checked_div(iter.chunks)
.unwrap_or(adjusted_buffer_size);
iter.max_per_chunk = if right_sized_buffer > MINIMUM_READBACK_BUFFER {
right_sized_buffer
} else {
MINIMUM_READBACK_BUFFER
};
iter.buffers = vec![VecDeque::new(); iter.chunks as usize];
iter.chunk_offsets = vec![0 as u64; iter.chunks as usize];
for chunk_num in 0..iter.chunks {
let offset = fill_buff(
&mut iter.buffers[chunk_num as usize],
File::open(iter.tmp_dir.path().join(chunk_num.to_string()))?,
iter.max_per_chunk,
)?;
iter.chunk_offsets[chunk_num as usize] = offset;
}
}
Ok(iter)
}
fn write_chunk(&self, file: &PathBuf, chunk: &mut Vec<Line>) -> Result<(), Box<dyn Error>> {
let new_file = OpenOptions::new().create(true).append(true).open(file)?;
let mut buf_write = Box::new(BufWriter::new(new_file)) as Box<dyn Write>;
for s in chunk {
let mut serialized = serde_json::to_string(&s).expect("JSON write error: ");
serialized.push_str("\n");
buf_write.write(serialized.as_bytes())?;
}
buf_write.flush()?;
Ok(())
}
}
fn fill_buff<Line>(
vec: &mut VecDeque<Line>,
file: File,
max_bytes: u64,
) -> Result<u64, Box<dyn Error>>
where
Line: ExternallySortable,
{
let mut total_read = 0;
let mut bytes_read = 0;
for line in BufReader::new(file).lines() {
let line_s = line?;
bytes_read += line_s.len() + 1;
// This is where the bad stuff happens usually
let deserialized: Line = serde_json::from_str(&line_s).expect("JSON read error: ");
total_read += deserialized.get_size();
vec.push_back(deserialized);
if total_read > max_bytes {
break;
}
}
Ok(bytes_read as u64)
}

View file

@ -14,20 +14,21 @@
//! More specifically, exponent can be understood so that the original number is in (1..10)*10^exponent. //! More specifically, exponent can be understood so that the original number is in (1..10)*10^exponent.
//! From that follows the constraints of this algorithm: It is able to compare numbers in ±(1*10^[i64::MIN]..10*10^[i64::MAX]). //! From that follows the constraints of this algorithm: It is able to compare numbers in ±(1*10^[i64::MIN]..10*10^[i64::MAX]).
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, ops::Range}; use std::{cmp::Ordering, ops::Range};
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Clone)]
enum Sign { enum Sign {
Negative, Negative,
Positive, Positive,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct NumInfo { pub struct NumInfo {
exponent: i64, exponent: i64,
sign: Sign, sign: Sign,
} }
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct NumInfoParseSettings { pub struct NumInfoParseSettings {
pub accept_si_units: bool, pub accept_si_units: bool,
pub thousands_separator: Option<char>, pub thousands_separator: Option<char>,

View file

@ -15,9 +15,11 @@
#[macro_use] #[macro_use]
extern crate uucore; extern crate uucore;
mod external_sort;
mod numeric_str_cmp; mod numeric_str_cmp;
use clap::{App, Arg}; use clap::{App, Arg};
use external_sort::{ExternalSorter, ExternallySortable};
use fnv::FnvHasher; use fnv::FnvHasher;
use itertools::Itertools; use itertools::Itertools;
use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings}; use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings};
@ -25,6 +27,7 @@ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use semver::Version; use semver::Version;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BinaryHeap; use std::collections::BinaryHeap;
@ -35,6 +38,7 @@ use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Lines, Read, Write};
use std::mem::replace; use std::mem::replace;
use std::ops::Range; use std::ops::Range;
use std::path::Path; use std::path::Path;
use std::path::PathBuf;
use unicode_width::UnicodeWidthStr; use unicode_width::UnicodeWidthStr;
use uucore::fs::is_stdin_interactive; // for Iterator::dedup() use uucore::fs::is_stdin_interactive; // for Iterator::dedup()
use uucore::InvalidEncodingHandling; use uucore::InvalidEncodingHandling;
@ -77,6 +81,8 @@ static OPT_RANDOM: &str = "random-sort";
static OPT_ZERO_TERMINATED: &str = "zero-terminated"; static OPT_ZERO_TERMINATED: &str = "zero-terminated";
static OPT_PARALLEL: &str = "parallel"; static OPT_PARALLEL: &str = "parallel";
static OPT_FILES0_FROM: &str = "files0-from"; static OPT_FILES0_FROM: &str = "files0-from";
static OPT_BUF_SIZE: &str = "buffer-size";
static OPT_TMP_DIR: &str = "temporary-directory";
static ARG_FILES: &str = "files"; static ARG_FILES: &str = "files";
@ -86,6 +92,8 @@ static THOUSANDS_SEP: char = ',';
static NEGATIVE: char = '-'; static NEGATIVE: char = '-';
static POSITIVE: char = '+'; static POSITIVE: char = '+';
static DEFAULT_BUF_SIZE: usize = std::usize::MAX;
#[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)] #[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Copy)]
enum SortMode { enum SortMode {
Numeric, Numeric,
@ -95,7 +103,7 @@ enum SortMode {
Version, Version,
Default, Default,
} }
#[derive(Clone)]
struct GlobalSettings { struct GlobalSettings {
mode: SortMode, mode: SortMode,
debug: bool, debug: bool,
@ -116,6 +124,31 @@ struct GlobalSettings {
separator: Option<char>, separator: Option<char>,
threads: String, threads: String,
zero_terminated: bool, zero_terminated: bool,
buffer_size: usize,
tmp_dir: PathBuf,
ext_sort: bool,
}
impl GlobalSettings {
// It's back to do conversions for command line opts!
// Probably want to do through numstrcmp somehow now?
fn human_numeric_convert(a: &str) -> usize {
let num_str = &a[get_leading_gen(a)];
let (_, suf_str) = a.split_at(num_str.len());
let num_usize = num_str
.parse::<usize>()
.expect("Error parsing buffer size: ");
let suf_usize: usize = match suf_str.to_uppercase().as_str() {
// SI Units
"B" => 1usize,
"K" => 1000usize,
"M" => 1000000usize,
"G" => 1000000000usize,
// GNU regards empty human numeric values as K by default
_ => 1000usize,
};
num_usize * suf_usize
}
} }
impl Default for GlobalSettings { impl Default for GlobalSettings {
@ -140,10 +173,13 @@ impl Default for GlobalSettings {
separator: None, separator: None,
threads: String::new(), threads: String::new(),
zero_terminated: false, zero_terminated: false,
buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(),
ext_sort: false,
} }
} }
} }
#[derive(Clone)]
struct KeySettings { struct KeySettings {
mode: SortMode, mode: SortMode,
ignore_blanks: bool, ignore_blanks: bool,
@ -168,6 +204,7 @@ impl From<&GlobalSettings> for KeySettings {
} }
} }
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Represents the string selected by a FieldSelector. /// Represents the string selected by a FieldSelector.
enum SelectionRange { enum SelectionRange {
/// If we had to transform this selection, we have to store a new string. /// If we had to transform this selection, we have to store a new string.
@ -199,6 +236,7 @@ impl SelectionRange {
} }
} }
#[derive(Serialize, Deserialize, Clone)]
enum NumCache { enum NumCache {
AsF64(GeneralF64ParseResult), AsF64(GeneralF64ParseResult),
WithInfo(NumInfo), WithInfo(NumInfo),
@ -219,7 +257,7 @@ impl NumCache {
} }
} }
} }
#[derive(Serialize, Deserialize, Clone)]
struct Selection { struct Selection {
range: SelectionRange, range: SelectionRange,
num_cache: NumCache, num_cache: NumCache,
@ -234,12 +272,20 @@ impl Selection {
type Field = Range<usize>; type Field = Range<usize>;
#[derive(Serialize, Deserialize, Clone)]
struct Line { struct Line {
line: String, line: String,
// The common case is not to specify fields. Let's make this fast. // The common case is not to specify fields. Let's make this fast.
selections: SmallVec<[Selection; 1]>, selections: SmallVec<[Selection; 1]>,
} }
impl ExternallySortable for Line {
fn get_size(&self) -> u64 {
// Currently 96 bytes, but that could change, so we get that size here
std::mem::size_of::<Line>() as u64
}
}
impl Line { impl Line {
fn new(line: String, settings: &GlobalSettings) -> Self { fn new(line: String, settings: &GlobalSettings) -> Self {
let fields = if settings let fields = if settings
@ -489,6 +535,7 @@ fn tokenize_with_separator(line: &str, separator: char) -> Vec<Field> {
tokens tokens
} }
#[derive(Clone)]
struct KeyPosition { struct KeyPosition {
/// 1-indexed, 0 is invalid. /// 1-indexed, 0 is invalid.
field: usize, field: usize,
@ -578,7 +625,7 @@ impl KeyPosition {
} }
} }
} }
#[derive(Clone)]
struct FieldSelector { struct FieldSelector {
from: KeyPosition, from: KeyPosition,
to: Option<KeyPosition>, to: Option<KeyPosition>,
@ -912,6 +959,22 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
.takes_value(true) .takes_value(true)
.value_name("NUM_THREADS"), .value_name("NUM_THREADS"),
) )
.arg(
Arg::with_name(OPT_BUF_SIZE)
.short("S")
.long(OPT_BUF_SIZE)
.help("sets the maximum SIZE of each segment in number of sorted items")
.takes_value(true)
.value_name("SIZE"),
)
.arg(
Arg::with_name(OPT_TMP_DIR)
.short("T")
.long(OPT_TMP_DIR)
.help("use DIR for temporaries, not $TMPDIR or /tmp")
.takes_value(true)
.value_name("DIR"),
)
.arg( .arg(
Arg::with_name(OPT_FILES0_FROM) Arg::with_name(OPT_FILES0_FROM)
.long(OPT_FILES0_FROM) .long(OPT_FILES0_FROM)
@ -982,6 +1045,29 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
env::set_var("RAYON_NUM_THREADS", &settings.threads); env::set_var("RAYON_NUM_THREADS", &settings.threads);
} }
if matches.is_present(OPT_BUF_SIZE) {
settings.buffer_size = {
let input = matches
.value_of(OPT_BUF_SIZE)
.map(String::from)
.unwrap_or(format!("{}", DEFAULT_BUF_SIZE));
GlobalSettings::human_numeric_convert(&input)
};
settings.ext_sort = true;
}
if matches.is_present(OPT_TMP_DIR) {
let result = matches
.value_of(OPT_TMP_DIR)
.map(String::from)
.unwrap_or(format!("{}", env::temp_dir().display()));
settings.tmp_dir = PathBuf::from(result);
settings.ext_sort = true;
} else {
settings.tmp_dir = env::temp_dir();
}
settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED); settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED);
settings.merge = matches.is_present(OPT_MERGE); settings.merge = matches.is_present(OPT_MERGE);
@ -1066,10 +1152,10 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
}); });
} }
exec(files, &settings) exec(files, settings)
} }
fn exec(files: Vec<String>, settings: &GlobalSettings) -> i32 { fn exec(files: Vec<String>, settings: GlobalSettings) -> i32 {
let mut lines = Vec::new(); let mut lines = Vec::new();
let mut file_merger = FileMerger::new(&settings); let mut file_merger = FileMerger::new(&settings);
@ -1105,6 +1191,13 @@ fn exec(files: Vec<String>, settings: &GlobalSettings) -> i32 {
if settings.check { if settings.check {
return exec_check_file(&lines, &settings); return exec_check_file(&lines, &settings);
}
// Only use ext_sorter when we need to.
// Probably faster that we don't create
// an owned value each run
if settings.ext_sort {
lines = ext_sort_by(lines, settings.clone());
} else { } else {
sort_by(&mut lines, &settings); sort_by(&mut lines, &settings);
} }
@ -1112,7 +1205,7 @@ fn exec(files: Vec<String>, settings: &GlobalSettings) -> i32 {
if settings.merge { if settings.merge {
if settings.unique { if settings.unique {
print_sorted( print_sorted(
file_merger.dedup_by(|a, b| compare_by(a, b, settings) == Ordering::Equal), file_merger.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
&settings, &settings,
) )
} else { } else {
@ -1122,7 +1215,7 @@ fn exec(files: Vec<String>, settings: &GlobalSettings) -> i32 {
print_sorted( print_sorted(
lines lines
.into_iter() .into_iter()
.dedup_by(|a, b| compare_by(a, b, settings) == Ordering::Equal), .dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
&settings, &settings,
) )
} else { } else {
@ -1164,11 +1257,25 @@ fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 {
} }
} }
fn sort_by(lines: &mut Vec<Line>, settings: &GlobalSettings) { fn ext_sort_by(unsorted: Vec<Line>, settings: GlobalSettings) -> Vec<Line> {
let external_sorter = ExternalSorter::new(
settings.buffer_size as u64,
Some(settings.tmp_dir.clone()),
settings.clone(),
);
let iter = external_sorter
.sort_by(unsorted.into_iter(), settings.clone())
.unwrap()
.map(|x| x.unwrap())
.collect::<Vec<Line>>();
iter
}
fn sort_by(unsorted: &mut Vec<Line>, settings: &GlobalSettings) {
if settings.stable || settings.unique { if settings.stable || settings.unique {
lines.par_sort_by(|a, b| compare_by(a, b, &settings)) unsorted.par_sort_by(|a, b| compare_by(a, b, &settings))
} else { } else {
lines.par_sort_unstable_by(|a, b| compare_by(a, b, &settings)) unsorted.par_sort_unstable_by(|a, b| compare_by(a, b, &settings))
} }
} }
@ -1189,8 +1296,8 @@ fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering
(b_str, b_selection.num_cache.as_num_info()), (b_str, b_selection.num_cache.as_num_info()),
), ),
SortMode::GeneralNumeric => general_numeric_compare( SortMode::GeneralNumeric => general_numeric_compare(
a_selection.num_cache.as_f64(), general_f64_parse(&a_str[get_leading_gen(a_str)]),
b_selection.num_cache.as_f64(), general_f64_parse(&b_str[get_leading_gen(b_str)]),
), ),
SortMode::Month => month_compare(a_str, b_str), SortMode::Month => month_compare(a_str, b_str),
SortMode::Version => version_compare(a_str, b_str), SortMode::Version => version_compare(a_str, b_str),
@ -1268,7 +1375,7 @@ fn get_leading_gen(input: &str) -> Range<usize> {
leading_whitespace_len..input.len() leading_whitespace_len..input.len()
} }
#[derive(Copy, Clone, PartialEq, PartialOrd)] #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, PartialOrd)]
enum GeneralF64ParseResult { enum GeneralF64ParseResult {
Invalid, Invalid,
NaN, NaN,

View file

@ -15,6 +15,31 @@ fn test_helper(file_name: &str, args: &str) {
.stdout_is_fixture(format!("{}.expected.debug", file_name)); .stdout_is_fixture(format!("{}.expected.debug", file_name));
} }
// FYI, the initialization size of our Line struct is 96 bytes.
//
// At very small buffer sizes, with that overhead we are certainly going
// to overrun our buffer way, way, way too quickly because of these excess
// bytes for the struct.
//
// For instance, seq 0..20000 > ...text = 108894 bytes
// But overhead is 1920000 + 108894 = 2028894 bytes
//
// Or kjvbible-random.txt = 4332506 bytes, but minimum size of its
// 99817 lines in memory * 96 bytes = 9582432 bytes
//
// Here, we test 108894 bytes with a 50K buffer
//
#[test]
fn test_larger_than_specified_segment() {
new_ucmd!()
.arg("-n")
.arg("-S")
.arg("50K")
.arg("ext_sort.txt")
.succeeds()
.stdout_is_fixture(format!("{}", "ext_sort.expected"));
}
#[test] #[test]
fn test_months_whitespace() { fn test_months_whitespace() {
test_helper("months-whitespace", "-M"); test_helper("months-whitespace", "-M");
@ -34,6 +59,18 @@ fn test_human_numeric_whitespace() {
test_helper("human-numeric-whitespace", "-h"); test_helper("human-numeric-whitespace", "-h");
} }
// This tests where serde often fails when reading back JSON
// if it finds a null value
#[test]
fn test_extsort_as64_bailout() {
new_ucmd!()
.arg("-g")
.arg("-S 5K")
.arg("multiple_decimals_general.txt")
.succeeds()
.stdout_is_fixture("multiple_decimals_general.expected");
}
#[test] #[test]
fn test_multiple_decimals_general() { fn test_multiple_decimals_general() {
test_helper("multiple_decimals_general", "-g") test_helper("multiple_decimals_general", "-g")

20000
tests/fixtures/sort/ext_sort.expected vendored Normal file

File diff suppressed because it is too large Load diff

20000
tests/fixtures/sort/ext_sort.txt vendored Normal file

File diff suppressed because it is too large Load diff