1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 03:27:44 +00:00

split: add ByteChunkWriter and LineChunkWriter

Add the `ByteChunkWriter` and `LineChunkWriter` structs and
implementations, but don't use them yet. This structs offer an
alternative approach to writing chunks of output (contrasted with
`ByteSplitter` and `LineSplitter`). The main difference is that
control of which underlying file is being written is inside the writer
instead of outside.
This commit is contained in:
Jeffrey Finkelstein 2021-12-30 20:01:55 -05:00
parent 47b12b31a6
commit b31d63eaa9
3 changed files with 235 additions and 1 deletions

1
Cargo.lock generated
View file

@ -2821,6 +2821,7 @@ name = "uu_split"
version = "0.0.12"
dependencies = [
"clap 3.0.10",
"memchr 2.4.1",
"uucore",
]

View file

@ -16,6 +16,7 @@ path = "src/split.rs"
[dependencies]
clap = { version = "3.0", features = ["wrap_help", "cargo"] }
memchr = "2"
uucore = { version=">=0.0.11", package="uucore", path="../../uucore" }
[[bin]]

View file

@ -17,7 +17,7 @@ use std::convert::TryFrom;
use std::env;
use std::fmt;
use std::fs::{metadata, remove_file, File};
use std::io::{stdin, BufRead, BufReader, BufWriter, Read, Write};
use std::io::{stdin, BufRead, BufReader, BufWriter, ErrorKind, Read, Write};
use std::num::ParseIntError;
use std::path::Path;
use uucore::display::Quotable;
@ -317,6 +317,238 @@ impl Settings {
}
}
/// Write a certain number of bytes to one file, then move on to another one.
///
/// This struct maintains an underlying writer representing the
/// current chunk of the output. If a call to [`write`] would cause
/// the underlying writer to write more than the allowed number of
/// bytes, a new writer is created and the excess bytes are written to
/// that one instead. As many new underlying writers are created as
/// needed to write all the bytes in the input buffer.
struct ByteChunkWriter<'a> {
/// Parameters for creating the underlying writer for each new chunk.
settings: &'a Settings,
/// The maximum number of bytes allowed for a single chunk of output.
chunk_size: usize,
/// Running total of number of chunks that have been completed.
num_chunks_written: usize,
/// Remaining capacity in number of bytes in the current chunk.
///
/// This number starts at `chunk_size` and decreases as bytes are
/// written. Once it reaches zero, a writer for a new chunk is
/// initialized and this number gets reset to `chunk_size`.
num_bytes_remaining_in_current_chunk: usize,
/// The underlying writer for the current chunk.
///
/// Once the number of bytes written to this writer exceeds
/// `chunk_size`, a new writer is initialized and assigned to this
/// field.
inner: BufWriter<Box<dyn Write>>,
/// Iterator that yields filenames for each chunk.
filename_iterator: FilenameIterator<'a>,
}
impl<'a> ByteChunkWriter<'a> {
fn new(chunk_size: usize, settings: &'a Settings) -> Option<ByteChunkWriter<'a>> {
let mut filename_iterator = FilenameIterator::new(
&settings.prefix,
&settings.additional_suffix,
settings.suffix_length,
settings.numeric_suffix,
);
let filename = filename_iterator.next()?;
if settings.verbose {
println!("creating file {}", filename.quote());
}
let inner = platform::instantiate_current_writer(&settings.filter, &filename);
Some(ByteChunkWriter {
settings,
chunk_size,
num_bytes_remaining_in_current_chunk: chunk_size,
num_chunks_written: 0,
inner,
filename_iterator,
})
}
}
impl<'a> Write for ByteChunkWriter<'a> {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
// If the length of `buf` exceeds the number of bytes remaining
// in the current chunk, we will need to write to multiple
// different underlying writers. In that case, each iteration of
// this loop writes to the underlying writer that corresponds to
// the current chunk number.
let mut carryover_bytes_written = 0;
loop {
if buf.is_empty() {
return Ok(carryover_bytes_written);
}
// If the capacity of this chunk is greater than the number of
// bytes in `buf`, then write all the bytes in `buf`. Otherwise,
// write enough bytes to fill the current chunk, then increment
// the chunk number and repeat.
let n = buf.len();
if n < self.num_bytes_remaining_in_current_chunk {
let num_bytes_written = self.inner.write(buf)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
return Ok(carryover_bytes_written + num_bytes_written);
} else {
// Write enough bytes to fill the current chunk.
let i = self.num_bytes_remaining_in_current_chunk;
let num_bytes_written = self.inner.write(&buf[..i])?;
// It's possible that the underlying writer did not
// write all the bytes.
if num_bytes_written < i {
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
return Ok(carryover_bytes_written + num_bytes_written);
} else {
// Move the window to look at only the remaining bytes.
buf = &buf[i..];
// Increment the chunk number, reset the number of
// bytes remaining, and instantiate the new
// underlying writer.
self.num_chunks_written += 1;
self.num_bytes_remaining_in_current_chunk = self.chunk_size;
// Remember for the next iteration that we wrote these bytes.
carryover_bytes_written += num_bytes_written;
// Only create the writer for the next chunk if
// there are any remaining bytes to write. This
// check prevents us from creating a new empty
// file.
if !buf.is_empty() {
let filename = self.filename_iterator.next().ok_or_else(|| {
std::io::Error::new(ErrorKind::Other, "output file suffixes exhausted")
})?;
if self.settings.verbose {
println!("creating file {}", filename.quote());
}
self.inner =
platform::instantiate_current_writer(&self.settings.filter, &filename);
}
}
}
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
/// Write a certain number of lines to one file, then move on to another one.
///
/// This struct maintains an underlying writer representing the
/// current chunk of the output. If a call to [`write`] would cause
/// the underlying writer to write more than the allowed number of
/// lines, a new writer is created and the excess lines are written to
/// that one instead. As many new underlying writers are created as
/// needed to write all the lines in the input buffer.
struct LineChunkWriter<'a> {
/// Parameters for creating the underlying writer for each new chunk.
settings: &'a Settings,
/// The maximum number of lines allowed for a single chunk of output.
chunk_size: usize,
/// Running total of number of chunks that have been completed.
num_chunks_written: usize,
/// Remaining capacity in number of lines in the current chunk.
///
/// This number starts at `chunk_size` and decreases as lines are
/// written. Once it reaches zero, a writer for a new chunk is
/// initialized and this number gets reset to `chunk_size`.
num_lines_remaining_in_current_chunk: usize,
/// The underlying writer for the current chunk.
///
/// Once the number of lines written to this writer exceeds
/// `chunk_size`, a new writer is initialized and assigned to this
/// field.
inner: BufWriter<Box<dyn Write>>,
/// Iterator that yields filenames for each chunk.
filename_iterator: FilenameIterator<'a>,
}
impl<'a> LineChunkWriter<'a> {
fn new(chunk_size: usize, settings: &'a Settings) -> Option<LineChunkWriter<'a>> {
let mut filename_iterator = FilenameIterator::new(
&settings.prefix,
&settings.additional_suffix,
settings.suffix_length,
settings.numeric_suffix,
);
let filename = filename_iterator.next()?;
if settings.verbose {
println!("creating file {}", filename.quote());
}
let inner = platform::instantiate_current_writer(&settings.filter, &filename);
Some(LineChunkWriter {
settings,
chunk_size,
num_lines_remaining_in_current_chunk: chunk_size,
num_chunks_written: 0,
inner,
filename_iterator,
})
}
}
impl<'a> Write for LineChunkWriter<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// If the number of lines in `buf` exceeds the number of lines
// remaining in the current chunk, we will need to write to
// multiple different underlying writers. In that case, each
// iteration of this loop writes to the underlying writer that
// corresponds to the current chunk number.
let mut prev = 0;
let mut total_bytes_written = 0;
for i in memchr::memchr_iter(b'\n', buf) {
// If we have exceeded the number of lines to write in the
// current chunk, then start a new chunk and its
// corresponding writer.
if self.num_lines_remaining_in_current_chunk == 0 {
self.num_chunks_written += 1;
let filename = self.filename_iterator.next().ok_or_else(|| {
std::io::Error::new(ErrorKind::Other, "output file suffixes exhausted")
})?;
if self.settings.verbose {
println!("creating file {}", filename.quote());
}
self.inner = platform::instantiate_current_writer(&self.settings.filter, &filename);
self.num_lines_remaining_in_current_chunk = self.chunk_size;
}
// Write the line, starting from *after* the previous
// newline character and ending *after* the current
// newline character.
let n = self.inner.write(&buf[prev..i + 1])?;
total_bytes_written += n;
prev = i + 1;
self.num_lines_remaining_in_current_chunk -= 1;
}
let n = self.inner.write(&buf[prev..buf.len()])?;
total_bytes_written += n;
Ok(total_bytes_written)
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
trait Splitter {
// Consume as much as possible from `reader` so as to saturate `writer`.
// Equivalent to finishing one of the part files. Returns the number of