1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 03:27:44 +00:00

Merge pull request #2827 from jfinkels/split-std-io-copy

split: use std::io::copy() with new writer implementation to improve maintainability and speed
This commit is contained in:
Sylvestre Ledru 2022-02-12 11:33:12 +01:00 committed by GitHub
commit 6b6d5ee7db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 333 additions and 146 deletions

1
Cargo.lock generated
View file

@ -2850,6 +2850,7 @@ name = "uu_split"
version = "0.0.12"
dependencies = [
"clap 3.0.10",
"memchr 2.4.1",
"uucore",
]

View file

@ -0,0 +1,47 @@
<!-- spell-checker:ignore testfile -->
# Benchmarking to measure performance
To compare the performance of the `uutils` version of `split` with the
GNU version of `split`, you can use a benchmarking tool like
[hyperfine][0]. On Ubuntu 18.04 or later, you can install `hyperfine` by
running
sudo apt-get install hyperfine
Next, build the `split` binary under the release profile:
cargo build --release -p uu_split
Now, get a text file to test `split` on. The `split` program has three
main modes of operation: chunk by lines, chunk by bytes, and chunk by
lines with a byte limit. You may want to test the performance of `split`
with various shapes and sizes of input files and under various modes of
operation. For example, to test chunking by bytes on a large input file,
you can create a file named `testfile.txt` containing one million null
bytes like this:
printf "%0.s\0" {1..1000000} > testfile.txt
For another example, to test chunking by bytes on a large real-world
input file, you could download a [database dump of Wikidata][1] or some
related files that the Wikimedia project provides. For example, [this
file][2] contains about 130 million lines.
Finally, you can compare the performance of the two versions of `split`
by running, for example,
cd /tmp && hyperfine \
--prepare 'rm x* || true' \
"split -b 1000 testfile.txt" \
"target/release/split -b 1000 testfile.txt"
Since `split` creates a lot of files on the filesystem, I recommend
changing to the `/tmp` directory before running the benchmark. The
`--prepare` argument to `hyperfine` runs a specified command before each
timing run. We specify `rm x* || true` so that the output files from the
previous run of `split` are removed before each run begins.
[0]: https://github.com/sharkdp/hyperfine
[1]: https://www.wikidata.org/wiki/Wikidata:Database_download
[2]: https://dumps.wikimedia.org/wikidatawiki/20211001/wikidatawiki-20211001-pages-logging.xml.gz

View file

@ -16,6 +16,7 @@ path = "src/split.rs"
[dependencies]
clap = { version = "3.0", features = ["wrap_help", "cargo"] }
memchr = "2"
uucore = { version=">=0.0.11", package="uucore", path="../../uucore" }
[[bin]]

View file

@ -13,16 +13,16 @@ mod platform;
use crate::filenames::FilenameIterator;
use clap::{crate_version, App, AppSettings, Arg, ArgMatches};
use std::convert::TryFrom;
use std::env;
use std::fmt;
use std::fs::{metadata, remove_file, File};
use std::io::{stdin, BufRead, BufReader, BufWriter, Read, Write};
use std::fs::{metadata, File};
use std::io::{stdin, BufReader, BufWriter, ErrorKind, Read, Write};
use std::num::ParseIntError;
use std::path::Path;
use uucore::display::Quotable;
use uucore::error::{FromIo, UResult, USimpleError, UUsageError};
use uucore::error::{FromIo, UIoError, UResult, USimpleError, UUsageError};
use uucore::parse_size::{parse_size, ParseSizeError};
use uucore::uio_error;
static OPT_BYTES: &str = "bytes";
static OPT_LINE_BYTES: &str = "line-bytes";
@ -332,102 +332,235 @@ impl Settings {
}
}
trait Splitter {
// Consume as much as possible from `reader` so as to saturate `writer`.
// Equivalent to finishing one of the part files. Returns the number of
// bytes that have been moved.
fn consume(
&mut self,
reader: &mut BufReader<Box<dyn Read>>,
writer: &mut BufWriter<Box<dyn Write>>,
) -> std::io::Result<u128>;
/// Write a certain number of bytes to one file, then move on to another one.
///
/// This struct maintains an underlying writer representing the
/// current chunk of the output. If a call to [`write`] would cause
/// the underlying writer to write more than the allowed number of
/// bytes, a new writer is created and the excess bytes are written to
/// that one instead. As many new underlying writers are created as
/// needed to write all the bytes in the input buffer.
struct ByteChunkWriter<'a> {
/// Parameters for creating the underlying writer for each new chunk.
settings: &'a Settings,
/// The maximum number of bytes allowed for a single chunk of output.
chunk_size: usize,
/// Running total of number of chunks that have been completed.
num_chunks_written: usize,
/// Remaining capacity in number of bytes in the current chunk.
///
/// This number starts at `chunk_size` and decreases as bytes are
/// written. Once it reaches zero, a writer for a new chunk is
/// initialized and this number gets reset to `chunk_size`.
num_bytes_remaining_in_current_chunk: usize,
/// The underlying writer for the current chunk.
///
/// Once the number of bytes written to this writer exceeds
/// `chunk_size`, a new writer is initialized and assigned to this
/// field.
inner: BufWriter<Box<dyn Write>>,
/// Iterator that yields filenames for each chunk.
filename_iterator: FilenameIterator<'a>,
}
struct LineSplitter {
lines_per_split: usize,
}
impl LineSplitter {
fn new(chunk_size: usize) -> Self {
Self {
lines_per_split: chunk_size,
impl<'a> ByteChunkWriter<'a> {
fn new(chunk_size: usize, settings: &'a Settings) -> Option<ByteChunkWriter<'a>> {
let mut filename_iterator = FilenameIterator::new(
&settings.prefix,
&settings.additional_suffix,
settings.suffix_length,
settings.numeric_suffix,
);
let filename = filename_iterator.next()?;
if settings.verbose {
println!("creating file {}", filename.quote());
}
let inner = platform::instantiate_current_writer(&settings.filter, &filename);
Some(ByteChunkWriter {
settings,
chunk_size,
num_bytes_remaining_in_current_chunk: chunk_size,
num_chunks_written: 0,
inner,
filename_iterator,
})
}
}
impl Splitter for LineSplitter {
fn consume(
&mut self,
reader: &mut BufReader<Box<dyn Read>>,
writer: &mut BufWriter<Box<dyn Write>>,
) -> std::io::Result<u128> {
let mut bytes_consumed = 0u128;
let mut buffer = String::with_capacity(1024);
for _ in 0..self.lines_per_split {
let bytes_read = reader.read_line(&mut buffer)?;
// If we ever read 0 bytes then we know we've hit EOF.
if bytes_read == 0 {
return Ok(bytes_consumed);
impl<'a> Write for ByteChunkWriter<'a> {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
// If the length of `buf` exceeds the number of bytes remaining
// in the current chunk, we will need to write to multiple
// different underlying writers. In that case, each iteration of
// this loop writes to the underlying writer that corresponds to
// the current chunk number.
let mut carryover_bytes_written = 0;
loop {
if buf.is_empty() {
return Ok(carryover_bytes_written);
}
writer.write_all(buffer.as_bytes())?;
// Empty out the String buffer since `read_line` appends instead of
// replaces.
buffer.clear();
bytes_consumed += bytes_read as u128;
}
Ok(bytes_consumed)
}
}
struct ByteSplitter {
bytes_per_split: u128,
}
impl ByteSplitter {
fn new(chunk_size: usize) -> Self {
Self {
bytes_per_split: u128::try_from(chunk_size).unwrap(),
}
}
}
impl Splitter for ByteSplitter {
fn consume(
&mut self,
reader: &mut BufReader<Box<dyn Read>>,
writer: &mut BufWriter<Box<dyn Write>>,
) -> std::io::Result<u128> {
// We buffer reads and writes. We proceed until `bytes_consumed` is
// equal to `self.bytes_per_split` or we reach EOF.
let mut bytes_consumed = 0u128;
const BUFFER_SIZE: usize = 1024;
let mut buffer = [0u8; BUFFER_SIZE];
while bytes_consumed < self.bytes_per_split {
// Don't overshoot `self.bytes_per_split`! Note: Using std::cmp::min
// doesn't really work since we have to get types to match which
// can't be done in a way that keeps all conversions safe.
let bytes_desired = if (BUFFER_SIZE as u128) <= self.bytes_per_split - bytes_consumed {
BUFFER_SIZE
// If the capacity of this chunk is greater than the number of
// bytes in `buf`, then write all the bytes in `buf`. Otherwise,
// write enough bytes to fill the current chunk, then increment
// the chunk number and repeat.
let n = buf.len();
if n < self.num_bytes_remaining_in_current_chunk {
let num_bytes_written = self.inner.write(buf)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
return Ok(carryover_bytes_written + num_bytes_written);
} else {
// This is a safe conversion since the difference must be less
// than BUFFER_SIZE in this branch.
(self.bytes_per_split - bytes_consumed) as usize
};
let bytes_read = reader.read(&mut buffer[0..bytes_desired])?;
// If we ever read 0 bytes then we know we've hit EOF.
if bytes_read == 0 {
return Ok(bytes_consumed);
// Write enough bytes to fill the current chunk.
let i = self.num_bytes_remaining_in_current_chunk;
let num_bytes_written = self.inner.write(&buf[..i])?;
// It's possible that the underlying writer did not
// write all the bytes.
if num_bytes_written < i {
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
return Ok(carryover_bytes_written + num_bytes_written);
} else {
// Move the window to look at only the remaining bytes.
buf = &buf[i..];
// Increment the chunk number, reset the number of
// bytes remaining, and instantiate the new
// underlying writer.
self.num_chunks_written += 1;
self.num_bytes_remaining_in_current_chunk = self.chunk_size;
// Remember for the next iteration that we wrote these bytes.
carryover_bytes_written += num_bytes_written;
// Only create the writer for the next chunk if
// there are any remaining bytes to write. This
// check prevents us from creating a new empty
// file.
if !buf.is_empty() {
let filename = self.filename_iterator.next().ok_or_else(|| {
std::io::Error::new(ErrorKind::Other, "output file suffixes exhausted")
})?;
if self.settings.verbose {
println!("creating file {}", filename.quote());
}
self.inner =
platform::instantiate_current_writer(&self.settings.filter, &filename);
}
}
}
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
/// Write a certain number of lines to one file, then move on to another one.
///
/// This struct maintains an underlying writer representing the
/// current chunk of the output. If a call to [`write`] would cause
/// the underlying writer to write more than the allowed number of
/// lines, a new writer is created and the excess lines are written to
/// that one instead. As many new underlying writers are created as
/// needed to write all the lines in the input buffer.
struct LineChunkWriter<'a> {
/// Parameters for creating the underlying writer for each new chunk.
settings: &'a Settings,
/// The maximum number of lines allowed for a single chunk of output.
chunk_size: usize,
/// Running total of number of chunks that have been completed.
num_chunks_written: usize,
/// Remaining capacity in number of lines in the current chunk.
///
/// This number starts at `chunk_size` and decreases as lines are
/// written. Once it reaches zero, a writer for a new chunk is
/// initialized and this number gets reset to `chunk_size`.
num_lines_remaining_in_current_chunk: usize,
/// The underlying writer for the current chunk.
///
/// Once the number of lines written to this writer exceeds
/// `chunk_size`, a new writer is initialized and assigned to this
/// field.
inner: BufWriter<Box<dyn Write>>,
/// Iterator that yields filenames for each chunk.
filename_iterator: FilenameIterator<'a>,
}
impl<'a> LineChunkWriter<'a> {
fn new(chunk_size: usize, settings: &'a Settings) -> Option<LineChunkWriter<'a>> {
let mut filename_iterator = FilenameIterator::new(
&settings.prefix,
&settings.additional_suffix,
settings.suffix_length,
settings.numeric_suffix,
);
let filename = filename_iterator.next()?;
if settings.verbose {
println!("creating file {}", filename.quote());
}
let inner = platform::instantiate_current_writer(&settings.filter, &filename);
Some(LineChunkWriter {
settings,
chunk_size,
num_lines_remaining_in_current_chunk: chunk_size,
num_chunks_written: 0,
inner,
filename_iterator,
})
}
}
impl<'a> Write for LineChunkWriter<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// If the number of lines in `buf` exceeds the number of lines
// remaining in the current chunk, we will need to write to
// multiple different underlying writers. In that case, each
// iteration of this loop writes to the underlying writer that
// corresponds to the current chunk number.
let mut prev = 0;
let mut total_bytes_written = 0;
for i in memchr::memchr_iter(b'\n', buf) {
// If we have exceeded the number of lines to write in the
// current chunk, then start a new chunk and its
// corresponding writer.
if self.num_lines_remaining_in_current_chunk == 0 {
self.num_chunks_written += 1;
let filename = self.filename_iterator.next().ok_or_else(|| {
std::io::Error::new(ErrorKind::Other, "output file suffixes exhausted")
})?;
if self.settings.verbose {
println!("creating file {}", filename.quote());
}
self.inner = platform::instantiate_current_writer(&self.settings.filter, &filename);
self.num_lines_remaining_in_current_chunk = self.chunk_size;
}
writer.write_all(&buffer[0..bytes_read])?;
bytes_consumed += bytes_read as u128;
// Write the line, starting from *after* the previous
// newline character and ending *after* the current
// newline character.
let n = self.inner.write(&buf[prev..i + 1])?;
total_bytes_written += n;
prev = i + 1;
self.num_lines_remaining_in_current_chunk -= 1;
}
Ok(bytes_consumed)
let n = self.inner.write(&buf[prev..buf.len()])?;
total_bytes_written += n;
Ok(total_bytes_written)
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
@ -522,65 +655,47 @@ fn split(settings: &Settings) -> UResult<()> {
Box::new(r) as Box<dyn Read>
});
if let Strategy::Number(num_chunks) = settings.strategy {
return split_into_n_chunks_by_byte(settings, &mut reader, num_chunks);
}
let mut splitter: Box<dyn Splitter> = match settings.strategy {
Strategy::Lines(chunk_size) => Box::new(LineSplitter::new(chunk_size)),
Strategy::Bytes(chunk_size) | Strategy::LineBytes(chunk_size) => {
Box::new(ByteSplitter::new(chunk_size))
match settings.strategy {
Strategy::Number(num_chunks) => {
split_into_n_chunks_by_byte(settings, &mut reader, num_chunks)
}
_ => unreachable!(),
};
// This object is responsible for creating the filename for each chunk.
let mut filename_iterator = FilenameIterator::new(
&settings.prefix,
&settings.additional_suffix,
settings.suffix_length,
settings.numeric_suffix,
);
loop {
// Get a new part file set up, and construct `writer` for it.
let filename = filename_iterator
.next()
.ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?;
let mut writer = platform::instantiate_current_writer(&settings.filter, filename.as_str());
let bytes_consumed = splitter
.consume(&mut reader, &mut writer)
.map_err_context(|| "input/output error".to_string())?;
writer
.flush()
.map_err_context(|| "error flushing to output file".to_string())?;
// If we didn't write anything we should clean up the empty file, and
// break from the loop.
if bytes_consumed == 0 {
// The output file is only ever created if --filter isn't used.
// Complicated, I know...
if settings.filter.is_none() {
remove_file(filename)
.map_err_context(|| "error removing empty file".to_string())?;
Strategy::Lines(chunk_size) => {
let mut writer = LineChunkWriter::new(chunk_size, settings)
.ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?;
match std::io::copy(&mut reader, &mut writer) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
// TODO Since the writer object controls the creation of
// new files, we need to rely on the `std::io::Result`
// returned by its `write()` method to communicate any
// errors to this calling scope. If a new file cannot be
// created because we have exceeded the number of
// allowable filenames, we use `ErrorKind::Other` to
// indicate that. A special error message needs to be
// printed in that case.
ErrorKind::Other => Err(USimpleError::new(1, "output file suffixes exhausted")),
_ => Err(uio_error!(e, "input/output error")),
},
}
break;
}
// TODO It is silly to have the "creating file" message here
// after the file has been already created. However, because
// of the way the main loop has been written, an extra file
// gets created and then deleted in the last iteration of the
// loop. So we need to make sure we are not in that case when
// printing this message.
//
// This is only here temporarily while we make some
// improvements to the architecture of the main loop in this
// function. In the future, it will move to a more appropriate
// place---at the point where the file is actually created.
if settings.verbose {
println!("creating file {}", filename.quote());
Strategy::Bytes(chunk_size) | Strategy::LineBytes(chunk_size) => {
let mut writer = ByteChunkWriter::new(chunk_size, settings)
.ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?;
match std::io::copy(&mut reader, &mut writer) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
// TODO Since the writer object controls the creation of
// new files, we need to rely on the `std::io::Result`
// returned by its `write()` method to communicate any
// errors to this calling scope. If a new file cannot be
// created because we have exceeded the number of
// allowable filenames, we use `ErrorKind::Other` to
// indicate that. A special error message needs to be
// printed in that case.
ErrorKind::Other => Err(USimpleError::new(1, "output file suffixes exhausted")),
_ => Err(uio_error!(e, "input/output error")),
},
}
}
}
Ok(())
}

View file

@ -2,7 +2,7 @@
// *
// * For the full copyright and license information, please view the LICENSE
// * file that was distributed with this source code.
// spell-checker:ignore xzaaa sixhundredfiftyonebytes ninetyonebytes asciilowercase fghij klmno pqrst uvwxyz
// spell-checker:ignore xzaaa sixhundredfiftyonebytes ninetyonebytes asciilowercase fghij klmno pqrst uvwxyz fivelines
extern crate rand;
extern crate regex;
@ -446,7 +446,7 @@ fn test_number() {
assert_eq!(file_read("xab"), "fghij");
assert_eq!(file_read("xac"), "klmno");
assert_eq!(file_read("xad"), "pqrst");
assert_eq!(file_read("xae"), "uvwxyz");
assert_eq!(file_read("xae"), "uvwxyz\n");
}
#[test]
@ -457,3 +457,21 @@ fn test_invalid_suffix_length() {
.no_stdout()
.stderr_contains("invalid suffix length: 'xyz'");
}
#[test]
fn test_include_newlines() {
let (at, mut ucmd) = at_and_ucmd!();
ucmd.args(&["-l", "2", "fivelines.txt"]).succeeds();
let mut s = String::new();
at.open("xaa").read_to_string(&mut s).unwrap();
assert_eq!(s, "1\n2\n");
let mut s = String::new();
at.open("xab").read_to_string(&mut s).unwrap();
assert_eq!(s, "3\n4\n");
let mut s = String::new();
at.open("xac").read_to_string(&mut s).unwrap();
assert_eq!(s, "5\n");
}

View file

@ -1 +1 @@
abcdefghijklmnopqrstuvwxyz
abcdefghijklmnopqrstuvwxyz

5
tests/fixtures/split/fivelines.txt vendored Normal file
View file

@ -0,0 +1,5 @@
1
2
3
4
5