1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 03:27:44 +00:00

Fix NumCache and Serde JSON conflict by disabling NumCache during extsort general numeric compares

This commit is contained in:
electricboogie 2021-04-25 10:03:29 -05:00
parent 2b8a6e98ee
commit 26fc8e57c7
4 changed files with 294 additions and 19 deletions

View file

@ -0,0 +1,19 @@
Copyright 2018 Battelle Memorial Institute
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,246 @@
use std::{clone::Clone};
use std::cmp::Ordering::Less;
use std::collections::VecDeque;
use std::error::Error;
use std::fs::{File, OpenOptions};
use std::io::SeekFrom::Start;
use std::io::{BufRead, BufReader, Seek, Write};
use std::marker::PhantomData;
use std::path::PathBuf;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json;
use tempdir::TempDir;
use super::{GlobalSettings, Line};
/// Trait for types that can be used by
/// [ExternalSorter](struct.ExternalSorter.html). Must be sortable, cloneable,
/// serializeable, and able to report on it's size
pub trait ExternallySortable: Clone + Serialize + DeserializeOwned {
/// Get the size, in bytes, of this object (used to constrain the buffer
/// used in the external sort).
fn get_size(&self) -> u64;
}
/// Iterator that provides sorted `T`s
pub struct ExtSortedIterator<Line> {
buffers: Vec<VecDeque<Line>>,
chunk_offsets: Vec<u64>,
max_per_chunk: u64,
chunks: u64,
tmp_dir: TempDir,
settings: GlobalSettings,
failed: bool,
}
impl Iterator for ExtSortedIterator<Line>
where
Line: ExternallySortable,
{
type Item = Result<Line, Box<dyn Error>>;
/// # Errors
///
/// This method can fail due to issues reading intermediate sorted chunks
/// from disk, or due to serde deserialization issues
fn next(&mut self) -> Option<Self::Item> {
if self.failed {
return None;
}
// fill up any empty buffers
let mut empty = true;
for chunk_num in 0..self.chunks {
if self.buffers[chunk_num as usize].is_empty() {
let mut f = match File::open(self.tmp_dir.path().join(chunk_num.to_string())) {
Ok(f) => f,
Err(e) => {
self.failed = true;
return Some(Err(Box::new(e)));
}
};
match f.seek(Start(self.chunk_offsets[chunk_num as usize])) {
Ok(_) => (),
Err(e) => {
self.failed = true;
return Some(Err(Box::new(e)));
}
}
let bytes_read =
match fill_buff(&mut self.buffers[chunk_num as usize], f, self.max_per_chunk) {
Ok(bytes_read) => bytes_read,
Err(e) => {
self.failed = true;
return Some(Err(e));
}
};
self.chunk_offsets[chunk_num as usize] += bytes_read;
if !self.buffers[chunk_num as usize].is_empty() {
empty = false;
}
} else {
empty = false;
}
}
if empty {
return None;
}
// find the next record to write
// check is_empty() before unwrap()ing
let mut idx = 0;
for chunk_num in 0..self.chunks as usize {
if !self.buffers[chunk_num].is_empty() {
if self.buffers[idx].is_empty() || (super::compare_by)(
self.buffers[chunk_num].front().unwrap(),
self.buffers[idx].front().unwrap(),
&self.settings
) == Less
{
idx = chunk_num;
}
}
}
// unwrap due to checks above
let r = self.buffers[idx].pop_front().unwrap();
Some(Ok(r))
}
}
/// Perform an external sort on an unsorted stream of incoming data
pub struct ExternalSorter<Line>
where
Line: ExternallySortable,
{
tmp_dir: Option<PathBuf>,
buffer_bytes: u64,
phantom: PhantomData<Line>,
settings: GlobalSettings,
}
impl ExternalSorter<Line>
where
Line: ExternallySortable,
{
/// Create a new `ExternalSorter` with a specified memory buffer and
/// temporary directory
pub fn new(buffer_bytes: u64, tmp_dir: Option<PathBuf>, settings: GlobalSettings) -> ExternalSorter<Line> {
ExternalSorter {
buffer_bytes,
tmp_dir,
phantom: PhantomData,
settings,
}
}
/// Sort (based on `compare`) the `T`s provided by `unsorted` and return an
/// iterator
///
/// # Errors
///
/// This method can fail due to issues writing intermediate sorted chunks
/// to disk, or due to serde serialization issues
pub fn sort_by<I>(&self, unsorted: I, settings: GlobalSettings) -> Result<ExtSortedIterator<Line>, Box<dyn Error>>
where
I: Iterator<Item = Line>,
{
let tmp_dir = match self.tmp_dir {
Some(ref p) => TempDir::new_in(p, "uutils_sort")?,
None => TempDir::new("uutils_sort")?,
};
// creating the thing we need to return first due to the face that we need to
// borrow tmp_dir and move it out
let mut iter = ExtSortedIterator {
buffers: Vec::new(),
chunk_offsets: Vec::new(),
max_per_chunk: 0,
chunks: 0,
tmp_dir,
settings,
failed: false,
};
{
let mut total_read = 0;
let mut chunk = Vec::new();
// make the initial chunks on disk
for seq in unsorted {
total_read += seq.get_size();
chunk.push(seq);
if total_read >= self.buffer_bytes {
super::sort_by(&mut chunk, &self.settings);
self.write_chunk(
&iter.tmp_dir.path().join(iter.chunks.to_string()),
&mut chunk,
)?;
chunk.clear();
total_read = 0;
iter.chunks += 1;
}
}
// write the last chunk
if chunk.len() > 0 {
super::sort_by(&mut chunk, &self.settings);
self.write_chunk(
&iter.tmp_dir.path().join(iter.chunks.to_string()),
&mut chunk,
)?;
iter.chunks += 1;
}
// initialize buffers for each chunk
iter.max_per_chunk = self.buffer_bytes.checked_div(iter.chunks).unwrap_or(self.buffer_bytes);
iter.buffers = vec![VecDeque::new(); iter.chunks as usize];
iter.chunk_offsets = vec![0 as u64; iter.chunks as usize];
for chunk_num in 0..iter.chunks {
let offset = fill_buff(
&mut iter.buffers[chunk_num as usize],
File::open(iter.tmp_dir.path().join(chunk_num.to_string()))?,
iter.max_per_chunk,
)?;
iter.chunk_offsets[chunk_num as usize] = offset;
}
}
Ok(iter)
}
fn write_chunk(&self, file: &PathBuf, chunk: &mut Vec<Line>) -> Result<(), Box<dyn Error>> {
let mut new_file = OpenOptions::new().create(true).append(true).open(file)?;
for s in chunk {
let mut serialized = serde_json::to_string(&s).expect("JSON write error: ");
serialized.push_str("\n");
new_file.write_all(serialized.as_bytes())?;
}
Ok(())
}
}
fn fill_buff<Line>(vec: &mut VecDeque<Line>, file: File, max_bytes: u64) -> Result<u64, Box<dyn Error>>
where
Line: ExternallySortable,
{
let mut total_read = 0;
let mut bytes_read = 0;
for line in BufReader::new(file).lines() {
let line_s = line?;
bytes_read += line_s.len() + 1;
// This is where the bad stuff happens usually
let deserialized: Line = match serde_json::from_str(&line_s) {
Ok(x) => x,
Err(err) => panic!("JSON read error: {}", err),
};
total_read += deserialized.get_size();
vec.push_back(deserialized);
if total_read > max_bytes {
break;
}
}
Ok(bytes_read as u64)
}

View file

@ -91,7 +91,7 @@ static NEGATIVE: char = '-';
static POSITIVE: char = '+';
static DEFAULT_TMPDIR: &str = r"/tmp";
// 16GB buffer for Vec<Line> before we dump to disk
// 16GB buffer for Vec<Line> before we dump to disk, never used
static DEFAULT_BUF_SIZE: usize = 16000000000;
#[derive(Eq, Ord, PartialEq, PartialOrd, Clone)]
@ -292,14 +292,6 @@ impl ExternallySortable for Line {
}
}
impl PartialEq for Line {
fn eq(&self, other: &Self) -> bool {
self.line == other.line
}
}
impl Eq for Line {}
impl Line {
fn new(line: String, settings: &GlobalSettings) -> Self {
let fields = if settings
@ -343,7 +335,7 @@ impl Line {
);
range.shorten(num_range);
NumCache::WithInfo(info)
} else if selector.settings.mode == SortMode::GeneralNumeric {
} else if selector.settings.mode == SortMode::GeneralNumeric && settings.buffer_size == DEFAULT_BUF_SIZE {
NumCache::AsF64(permissive_f64_parse(get_leading_gen(range.get_str(&line))))
} else {
NumCache::None
@ -1103,11 +1095,12 @@ fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 {
fn ext_sort_by(unsorted: Vec<Line>, settings: GlobalSettings) -> Vec<Line> {
let external_sorter = ExternalSorter::new(settings.buffer_size as u64, Some(settings.tmp_dir.clone()), settings.clone());
let iter = external_sorter.sort_by(unsorted.into_iter(), settings.clone()).unwrap();
let vec = iter.filter(|x| x.is_ok() )
.map(|x| x.unwrap())
.collect::<Vec<Line>>();
vec
let iter = external_sorter
.sort_by(unsorted.into_iter(), settings.clone())
.unwrap()
.map(|x| x.unwrap())
.collect::<Vec<Line>>();
iter
}
fn sort_by(lines: &mut Vec<Line>, settings: &GlobalSettings) {
@ -1130,10 +1123,15 @@ fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering
(a_str, a_selection.num_cache.as_num_info()),
(b_str, b_selection.num_cache.as_num_info()),
),
SortMode::GeneralNumeric => general_numeric_compare(
a_selection.num_cache.as_f64(),
b_selection.num_cache.as_f64(),
),
// serde JSON has issues with f64 null values, so caching them won't work for us with ext sort
SortMode::GeneralNumeric =>
if global_settings.buffer_size == DEFAULT_BUF_SIZE {
general_numeric_compare(a_selection.num_cache.as_f64(),
b_selection.num_cache.as_f64())
} else {
general_numeric_compare(permissive_f64_parse(get_leading_gen(a_str)),
permissive_f64_parse(get_leading_gen(b_str)))
},
SortMode::Month => month_compare(a_str, b_str),
SortMode::Version => version_compare(a_str, b_str),
SortMode::Default => default_compare(a_str, b_str),

View file

@ -51,6 +51,18 @@ fn test_human_numeric_whitespace() {
test_helper("human-numeric-whitespace", "-h");
}
// This doesn't test the ext sort feature as such, just this codepath where
// ext sort can fail when reading back JSON if it finds a null value
#[test]
fn test_extsort_as64_bailout() {
new_ucmd!()
.arg("-g")
.arg("-S 10K")
.arg("multiple_decimals_general.txt")
.succeeds()
.stdout_is("\n\n\n\n\n\n\n\nCARAvan\n-2028789030\n-896689\n-8.90880\n-1\n-.05\n000\n00000001\n1\n1.040000000\n1.444\n1.58590\n8.013\n45\n46.89\n576,446.88800000\n576,446.890\n 4567.\n4567.1\n4567.34\n\t\t\t\t\t\t\t\t\t\t4567..457\n\t\t\t\t37800\n\t\t\t\t\t\t45670.89079.098\n\t\t\t\t\t\t45670.89079.1\n4798908.340000000000\n4798908.45\n4798908.8909800\n");
}
#[test]
fn test_multiple_decimals_general() {
new_ucmd!()