From 2b8a6e98eeed5d86a5af8a0a87df2d04dca55a0a Mon Sep 17 00:00:00 2001 From: electricboogie <32370782+electricboogie@users.noreply.github.com> Date: Sun, 25 Apr 2021 00:20:56 -0500 Subject: [PATCH] Working ExtSort --- Cargo.lock | 2 +- src/uu/sort/Cargo.toml | 2 +- src/uu/sort/src/ext_sorter/LICENSE | 202 --------------------- src/uu/sort/src/ext_sorter/NOTICE | 9 - src/uu/sort/src/ext_sorter/mod.rs | 277 ----------------------------- src/uu/sort/src/sort.rs | 108 +++++------ 6 files changed, 47 insertions(+), 553 deletions(-) delete mode 100644 src/uu/sort/src/ext_sorter/LICENSE delete mode 100644 src/uu/sort/src/ext_sorter/NOTICE delete mode 100644 src/uu/sort/src/ext_sorter/mod.rs diff --git a/Cargo.lock b/Cargo.lock index eb99af34b..d5dbf3508 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2305,7 +2305,7 @@ dependencies = [ "serde", "serde_json", "smallvec 1.6.1", - "tempfile", + "tempdir", "uucore", "uucore_procs", ] diff --git a/src/uu/sort/Cargo.toml b/src/uu/sort/Cargo.toml index f29df6ab8..12c685e23 100644 --- a/src/uu/sort/Cargo.toml +++ b/src/uu/sort/Cargo.toml @@ -26,7 +26,7 @@ semver = "0.9.0" smallvec = { version = "1.6.1", features = ["serde"] } uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] } uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" } -tempfile = "3.1.0" +tempdir = "0.3.7" [[bin]] name = "sort" diff --git a/src/uu/sort/src/ext_sorter/LICENSE b/src/uu/sort/src/ext_sorter/LICENSE deleted file mode 100644 index fe647bd7f..000000000 --- a/src/uu/sort/src/ext_sorter/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - -Copyright [yyyy] [name of copyright owner] - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. \ No newline at end of file diff --git a/src/uu/sort/src/ext_sorter/NOTICE b/src/uu/sort/src/ext_sorter/NOTICE deleted file mode 100644 index fdfc6f04f..000000000 --- a/src/uu/sort/src/ext_sorter/NOTICE +++ /dev/null @@ -1,9 +0,0 @@ -ext_sorter -Copyright 2018 Andre-Philippe Paquet -Modifications copyright 2021 Robert Swinford - -This ext_sorter module includes software developed by Andre-Philippe Paquet as extsort. - -The sorter.rs file was copied and modified for use in the uutils' coreutils subproject, sort. - -sort is licensed according to the term of the LICENSE file found in root directory of the uutils' coreutils project. \ No newline at end of file diff --git a/src/uu/sort/src/ext_sorter/mod.rs b/src/uu/sort/src/ext_sorter/mod.rs deleted file mode 100644 index a90be6bb0..000000000 --- a/src/uu/sort/src/ext_sorter/mod.rs +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright 2018 Andre-Philippe Paquet -// Modifications copyright 2021 Robert Swinford -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// This file has been modified for use in the uutils' coreutils subproject, sort. - -use rayon::prelude::*; -use std::{ - cmp::Ordering, - collections::VecDeque, - fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Error, Read, Seek, SeekFrom, Write}, - path::{Path, PathBuf}, -}; - -/// Exposes external sorting (i.e. on disk sorting) capability on arbitrarily -/// sized iterator, even if the generated content of the iterator doesn't fit in -/// memory. -/// -/// It uses an in-memory buffer sorted and flushed to disk in segment files when -/// full. Once sorted, it returns a new sorted iterator with all items. In order -/// to remain efficient for all implementations, the crate doesn't handle -/// serialization, but leaves that to the user. -pub struct ExternalSorter { - segment_size: usize, - sort_dir: Option, - parallel: bool, -} - -impl ExternalSorter { - pub fn new() -> ExternalSorter { - ExternalSorter { - // Default is 16G - But we never use it, - // because we always set or ignore - segment_size: 16000000000, - sort_dir: None, - parallel: false, - } - } - - /// Sets the maximum size of each segment in number of sorted items. - /// - /// This number of items needs to fit in memory. While sorting, a - /// in-memory buffer is used to collect the items to be sorted. Once - /// it reaches the maximum size, it is sorted and then written to disk. - /// - /// Using a higher segment size makes sorting faster by leveraging - /// faster in-memory operations. - pub fn with_segment_size(mut self, size: usize) -> Self { - self.segment_size = size; - self - } - - /// Sets directory in which sorted segments will be written (if it doesn't - /// fit in memory). - pub fn with_sort_dir(mut self, path: PathBuf) -> Self { - self.sort_dir = Some(path); - self - } - - /// Uses Rayon to sort the in-memory buffer. - /// - /// This may not be needed if the buffer isn't big enough for parallelism to - /// be gainful over the overhead of multithreading. - pub fn with_parallel_sort(mut self) -> Self { - self.parallel = true; - self - } - - /// Sorts a given iterator with a comparator function, returning a new iterator with items - pub fn sort_by(&self, iterator: I, cmp: F) -> Result, Error> - where - T: Sortable, - I: Iterator, - F: Fn(&T, &T) -> Ordering + Send + Sync, - { - let mut tempdir: Option = None; - let mut sort_dir: Option = None; - - let mut count = 0; - let mut segments_file: Vec = Vec::new(); - - let size_of_items = std::mem::size_of::(); - // Get size of iterator - let (_, upper_bound) = iterator.size_hint(); - // Buffer size specified + minimum overhead of struct / size of items - let initial_capacity = (self.segment_size + (upper_bound.unwrap() * size_of_items)) / size_of_items; - let mut buffer: Vec = Vec::with_capacity(initial_capacity); - - for next_item in iterator { - count += 1; - buffer.push(next_item); - // if after push, number of elements in vector > initial capacity - if buffer.len() > initial_capacity { - let sort_dir = self.lazy_create_dir(&mut tempdir, &mut sort_dir)?; - self.sort_and_write_segment(sort_dir, &mut segments_file, &mut buffer, &cmp)?; - // Truncate buffer back to initial capacity - buffer.truncate(initial_capacity); - } - } - - // Write any items left in buffer, but only if we had at least 1 segment - // written. Otherwise we use the buffer itself to iterate from memory - let pass_through_queue = if !buffer.is_empty() && !segments_file.is_empty() { - let sort_dir = self.lazy_create_dir(&mut tempdir, &mut sort_dir)?; - self.sort_and_write_segment(sort_dir, &mut segments_file, &mut buffer, &cmp)?; - None - } else { - buffer.sort_by(&cmp); - Some(VecDeque::from(buffer)) - }; - - SortedIterator::new(tempdir, pass_through_queue, segments_file, count, cmp) - } - - /// We only want to create directory if it's needed (i.e. if the dataset - /// doesn't fit in memory) to prevent filesystem latency - fn lazy_create_dir<'a>( - &self, - tempdir: &mut Option, - sort_dir: &'a mut Option, - ) -> Result<&'a Path, Error> { - if let Some(sort_dir) = sort_dir { - return Ok(sort_dir); - } - - *sort_dir = if let Some(ref sort_dir) = self.sort_dir { - Some(sort_dir.to_path_buf()) - } else { - *tempdir = Some(tempfile::TempDir::new()?); - Some(tempdir.as_ref().unwrap().path().to_path_buf()) - }; - - Ok(sort_dir.as_ref().unwrap()) - } - - fn sort_and_write_segment( - &self, - sort_dir: &Path, - segments: &mut Vec, - buffer: &mut Vec, - cmp: F, - ) -> Result<(), Error> - where - T: Sortable, - F: Fn(&T, &T) -> Ordering + Send + Sync, - { - if self.parallel { - buffer.par_sort_by(|a, b| cmp(a, b)); - } else { - buffer.sort_by(|a, b| cmp(a, b)); - } - - let segment_path = sort_dir.join(format!("{}", segments.len())); - let segment_file = OpenOptions::new() - .create(true) - .truncate(true) - .read(true) - .write(true) - .open(&segment_path)?; - let mut buf_writer = BufWriter::new(segment_file); - - // Possible panic here. - // Why use drain here, if we want to dump the entire buffer? - // Was "buffer.drain(0..)" - for item in buffer { - item.encode(&mut buf_writer); - } - - let file = buf_writer.into_inner()?; - segments.push(file); - - Ok(()) - } -} - -impl Default for ExternalSorter { - fn default() -> Self { - ExternalSorter::new() - } -} - -pub trait Sortable: Sized + Send { - fn encode(&self, writer: &mut W); - fn decode(reader: &mut R) -> Option; -} - -pub struct SortedIterator { - _tempdir: Option, - pass_through_queue: Option>, - segments_file: Vec>, - next_values: Vec>, - count: u64, - cmp: F, -} - -impl Ordering + Send + Sync> SortedIterator { - fn new( - tempdir: Option, - pass_through_queue: Option>, - mut segments_file: Vec, - count: u64, - cmp: F, - ) -> Result, Error> { - for segment in &mut segments_file { - segment.seek(SeekFrom::Start(0))?; - } - - let next_values = segments_file - .iter_mut() - .map(|file| T::decode(file)) - .collect(); - - let segments_file_buffered = segments_file.into_iter().map(BufReader::new).collect(); - - Ok(SortedIterator { - _tempdir: tempdir, - pass_through_queue, - segments_file: segments_file_buffered, - next_values, - count, - cmp, - }) - } - - pub fn sorted_count(&self) -> u64 { - self.count - } -} - -impl Ordering> Iterator for SortedIterator { - type Item = T; - - fn next(&mut self) -> Option { - // if we have a pass through, we dequeue from it directly - if let Some(ptb) = self.pass_through_queue.as_mut() { - return ptb.pop_front(); - } - - // otherwise, we iter from segments on disk - let mut smallest_idx: Option = None; - { - let mut smallest: Option<&T> = None; - for idx in 0..self.segments_file.len() { - let next_value = self.next_values[idx].as_ref(); - if next_value.is_none() { - continue; - } - - if smallest.is_none() - || (self.cmp)(next_value.unwrap(), smallest.unwrap()) == Ordering::Less - { - smallest = Some(next_value.unwrap()); - smallest_idx = Some(idx); - } - } - } - - smallest_idx.map(|idx| { - let file = &mut self.segments_file[idx]; - let value = self.next_values[idx].take().unwrap(); - self.next_values[idx] = T::decode(file); - value - }) - } -} diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 571541fc6..8c3a0cf7f 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -15,11 +15,11 @@ #[macro_use] extern crate uucore; -mod ext_sorter; mod numeric_str_cmp; +mod external_sort; +use external_sort::{ExternalSorter, ExternallySortable}; use clap::{App, Arg}; -use ext_sorter::{ExternalSorter, Sortable}; use fnv::FnvHasher; use itertools::Itertools; use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings}; @@ -27,7 +27,7 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use rayon::prelude::*; use semver::Version; -use serde::{Deserialize, Serialize}; +use serde::{Deserializer, Deserialize, Serialize}; use smallvec::SmallVec; use std::borrow::Cow; use std::cmp::Ordering; @@ -103,7 +103,7 @@ enum SortMode { Version, Default, } - +#[derive(Clone)] struct GlobalSettings { mode: SortMode, ignore_blanks: bool, @@ -176,7 +176,7 @@ impl Default for GlobalSettings { } } } - +#[derive(Clone)] struct KeySettings { mode: SortMode, ignore_blanks: bool, @@ -201,7 +201,7 @@ impl From<&GlobalSettings> for KeySettings { } } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone)] /// Represents the string selected by a FieldSelector. enum SelectionRange { /// If we had to transform this selection, we have to store a new string. @@ -232,13 +232,23 @@ impl SelectionRange { } } } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + +#[derive(Debug, Serialize, Deserialize, Clone)] enum NumCache { + #[serde(deserialize_with="bailout_parse_f64")] AsF64(f64), WithInfo(NumInfo), None, } +// Only used when serde can't parse a null value +fn bailout_parse_f64<'de, D>(d: D) -> Result where D: Deserializer<'de> { + Deserialize::deserialize(d) + .map(|x: Option<_>| { + x.unwrap_or(0f64) + }) +} + impl NumCache { fn as_f64(&self) -> f64 { match self { @@ -253,7 +263,7 @@ impl NumCache { } } } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone)] struct Selection { range: SelectionRange, num_cache: NumCache, @@ -267,56 +277,29 @@ impl Selection { } type Field = Range; -#[derive(Debug, Serialize, Deserialize)] + +#[derive(Serialize, Deserialize, Clone)] struct Line { line: String, // The common case is not to specify fields. Let's make this fast. selections: SmallVec<[Selection; 1]>, } -impl Sortable for Line { - fn encode(&self, write: &mut W) { - let line = Line { - line: self.line.to_owned(), - selections: self.selections.to_owned(), - }; - let serialized = serde_json::to_string(&line).unwrap(); - // Each instance of valid JSON needs to be seperated by something, so here we use a newline - write - .write_all(format!("{}{}", serialized, "\n").as_bytes()) - .unwrap(); - } - - // This crate asks us to write one Line struct at a time, but then returns multiple Lines to us at once. - // We concatanate them and return them as one big Line here. - fn decode(read: &mut R) -> Option { - let buf_reader = BufReader::new(read); - let result = { - let mut line_joined = String::new(); - // Return an empty vec for selections - let selections_joined = SmallVec::new(); - let mut p_iter = buf_reader.lines().peekable(); - while let Some(line) = p_iter.next() { - let deserialized_line: Line = - serde_json::from_str(&line.as_ref().unwrap()).unwrap(); - if let Some(_next_line) = p_iter.peek() { - line_joined = format!("{}\n{}\n", line_joined, deserialized_line.line) - } else { - line_joined = format!("{}\n{}", line_joined, deserialized_line.line) - } - // I think we've done our sorting already and these selctions are irrelevant? - // @miDeb what's your sense? Could we just return an empty vec? - //selections_joined.append(&mut deserialized_line.selections); - } - Some(Line { - line: line_joined, - selections: selections_joined, - }) - }; - result +impl ExternallySortable for Line { + fn get_size(&self) -> u64 { + // Currently 96 bytes, but that could change, so we get that size here + std::mem::size_of::() as u64 } } +impl PartialEq for Line { + fn eq(&self, other: &Self) -> bool { + self.line == other.line + } +} + +impl Eq for Line {} + impl Line { fn new(line: String, settings: &GlobalSettings) -> Self { let fields = if settings @@ -449,6 +432,7 @@ fn tokenize_with_separator(line: &str, separator: char) -> Vec { tokens } +#[derive(Clone)] struct KeyPosition { /// 1-indexed, 0 is invalid. field: usize, @@ -516,7 +500,7 @@ impl KeyPosition { } } } - +#[derive(Clone)] struct FieldSelector { from: KeyPosition, to: Option, @@ -1014,10 +998,10 @@ pub fn uumain(args: impl uucore::Args) -> i32 { }); } - exec(files, &settings) + exec(files, settings) } -fn exec(files: Vec, settings: &GlobalSettings) -> i32 { +fn exec(files: Vec, settings: GlobalSettings) -> i32 { let mut lines = Vec::new(); let mut file_merger = FileMerger::new(&settings); @@ -1059,7 +1043,7 @@ fn exec(files: Vec, settings: &GlobalSettings) -> i32 { // Probably faster that we don't create // an owned value each run if settings.buffer_size != DEFAULT_BUF_SIZE { - lines = ext_sort_by(lines, &settings); + lines = ext_sort_by(lines, settings.clone()); } else { sort_by(&mut lines, &settings); } @@ -1074,7 +1058,7 @@ fn exec(files: Vec, settings: &GlobalSettings) -> i32 { print_sorted( lines .into_iter() - .dedup_by(|a, b| compare_by(a, b, settings) == Ordering::Equal) + .dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal) .map(|line| line.line), &settings, ) @@ -1117,15 +1101,13 @@ fn exec_check_file(unwrapped_lines: &[Line], settings: &GlobalSettings) -> i32 { } } -fn ext_sort_by(lines: Vec, settings: &GlobalSettings) -> Vec { - let sorter = ExternalSorter::new() - .with_segment_size(settings.buffer_size) - .with_sort_dir(settings.tmp_dir.clone()) - .with_parallel_sort(); - sorter - .sort_by(lines.into_iter(), |a, b| compare_by(a, b, &settings)) - .unwrap() - .collect() +fn ext_sort_by(unsorted: Vec, settings: GlobalSettings) -> Vec { + let external_sorter = ExternalSorter::new(settings.buffer_size as u64, Some(settings.tmp_dir.clone()), settings.clone()); + let iter = external_sorter.sort_by(unsorted.into_iter(), settings.clone()).unwrap(); + let vec = iter.filter(|x| x.is_ok() ) + .map(|x| x.unwrap()) + .collect::>(); + vec } fn sort_by(lines: &mut Vec, settings: &GlobalSettings) {