From b31d63eaa9835176cf9c9312c1addc7ae2a129ce Mon Sep 17 00:00:00 2001 From: Jeffrey Finkelstein Date: Thu, 30 Dec 2021 20:01:55 -0500 Subject: [PATCH] split: add ByteChunkWriter and LineChunkWriter Add the `ByteChunkWriter` and `LineChunkWriter` structs and implementations, but don't use them yet. This structs offer an alternative approach to writing chunks of output (contrasted with `ByteSplitter` and `LineSplitter`). The main difference is that control of which underlying file is being written is inside the writer instead of outside. --- Cargo.lock | 1 + src/uu/split/Cargo.toml | 1 + src/uu/split/src/split.rs | 234 +++++++++++++++++++++++++++++++++++++- 3 files changed, 235 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index cc7c3967b..4579e8868 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2821,6 +2821,7 @@ name = "uu_split" version = "0.0.12" dependencies = [ "clap 3.0.10", + "memchr 2.4.1", "uucore", ] diff --git a/src/uu/split/Cargo.toml b/src/uu/split/Cargo.toml index cf2e76747..f6920e797 100644 --- a/src/uu/split/Cargo.toml +++ b/src/uu/split/Cargo.toml @@ -16,6 +16,7 @@ path = "src/split.rs" [dependencies] clap = { version = "3.0", features = ["wrap_help", "cargo"] } +memchr = "2" uucore = { version=">=0.0.11", package="uucore", path="../../uucore" } [[bin]] diff --git a/src/uu/split/src/split.rs b/src/uu/split/src/split.rs index 1b6680142..c93065f7e 100644 --- a/src/uu/split/src/split.rs +++ b/src/uu/split/src/split.rs @@ -17,7 +17,7 @@ use std::convert::TryFrom; use std::env; use std::fmt; use std::fs::{metadata, remove_file, File}; -use std::io::{stdin, BufRead, BufReader, BufWriter, Read, Write}; +use std::io::{stdin, BufRead, BufReader, BufWriter, ErrorKind, Read, Write}; use std::num::ParseIntError; use std::path::Path; use uucore::display::Quotable; @@ -317,6 +317,238 @@ impl Settings { } } +/// Write a certain number of bytes to one file, then move on to another one. +/// +/// This struct maintains an underlying writer representing the +/// current chunk of the output. If a call to [`write`] would cause +/// the underlying writer to write more than the allowed number of +/// bytes, a new writer is created and the excess bytes are written to +/// that one instead. As many new underlying writers are created as +/// needed to write all the bytes in the input buffer. +struct ByteChunkWriter<'a> { + /// Parameters for creating the underlying writer for each new chunk. + settings: &'a Settings, + + /// The maximum number of bytes allowed for a single chunk of output. + chunk_size: usize, + + /// Running total of number of chunks that have been completed. + num_chunks_written: usize, + + /// Remaining capacity in number of bytes in the current chunk. + /// + /// This number starts at `chunk_size` and decreases as bytes are + /// written. Once it reaches zero, a writer for a new chunk is + /// initialized and this number gets reset to `chunk_size`. + num_bytes_remaining_in_current_chunk: usize, + + /// The underlying writer for the current chunk. + /// + /// Once the number of bytes written to this writer exceeds + /// `chunk_size`, a new writer is initialized and assigned to this + /// field. + inner: BufWriter>, + + /// Iterator that yields filenames for each chunk. + filename_iterator: FilenameIterator<'a>, +} + +impl<'a> ByteChunkWriter<'a> { + fn new(chunk_size: usize, settings: &'a Settings) -> Option> { + let mut filename_iterator = FilenameIterator::new( + &settings.prefix, + &settings.additional_suffix, + settings.suffix_length, + settings.numeric_suffix, + ); + let filename = filename_iterator.next()?; + if settings.verbose { + println!("creating file {}", filename.quote()); + } + let inner = platform::instantiate_current_writer(&settings.filter, &filename); + Some(ByteChunkWriter { + settings, + chunk_size, + num_bytes_remaining_in_current_chunk: chunk_size, + num_chunks_written: 0, + inner, + filename_iterator, + }) + } +} + +impl<'a> Write for ByteChunkWriter<'a> { + fn write(&mut self, mut buf: &[u8]) -> std::io::Result { + // If the length of `buf` exceeds the number of bytes remaining + // in the current chunk, we will need to write to multiple + // different underlying writers. In that case, each iteration of + // this loop writes to the underlying writer that corresponds to + // the current chunk number. + let mut carryover_bytes_written = 0; + loop { + if buf.is_empty() { + return Ok(carryover_bytes_written); + } + + // If the capacity of this chunk is greater than the number of + // bytes in `buf`, then write all the bytes in `buf`. Otherwise, + // write enough bytes to fill the current chunk, then increment + // the chunk number and repeat. + let n = buf.len(); + if n < self.num_bytes_remaining_in_current_chunk { + let num_bytes_written = self.inner.write(buf)?; + self.num_bytes_remaining_in_current_chunk -= num_bytes_written; + return Ok(carryover_bytes_written + num_bytes_written); + } else { + // Write enough bytes to fill the current chunk. + let i = self.num_bytes_remaining_in_current_chunk; + let num_bytes_written = self.inner.write(&buf[..i])?; + + // It's possible that the underlying writer did not + // write all the bytes. + if num_bytes_written < i { + self.num_bytes_remaining_in_current_chunk -= num_bytes_written; + return Ok(carryover_bytes_written + num_bytes_written); + } else { + // Move the window to look at only the remaining bytes. + buf = &buf[i..]; + + // Increment the chunk number, reset the number of + // bytes remaining, and instantiate the new + // underlying writer. + self.num_chunks_written += 1; + self.num_bytes_remaining_in_current_chunk = self.chunk_size; + + // Remember for the next iteration that we wrote these bytes. + carryover_bytes_written += num_bytes_written; + + // Only create the writer for the next chunk if + // there are any remaining bytes to write. This + // check prevents us from creating a new empty + // file. + if !buf.is_empty() { + let filename = self.filename_iterator.next().ok_or_else(|| { + std::io::Error::new(ErrorKind::Other, "output file suffixes exhausted") + })?; + if self.settings.verbose { + println!("creating file {}", filename.quote()); + } + self.inner = + platform::instantiate_current_writer(&self.settings.filter, &filename); + } + } + } + } + } + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +/// Write a certain number of lines to one file, then move on to another one. +/// +/// This struct maintains an underlying writer representing the +/// current chunk of the output. If a call to [`write`] would cause +/// the underlying writer to write more than the allowed number of +/// lines, a new writer is created and the excess lines are written to +/// that one instead. As many new underlying writers are created as +/// needed to write all the lines in the input buffer. +struct LineChunkWriter<'a> { + /// Parameters for creating the underlying writer for each new chunk. + settings: &'a Settings, + + /// The maximum number of lines allowed for a single chunk of output. + chunk_size: usize, + + /// Running total of number of chunks that have been completed. + num_chunks_written: usize, + + /// Remaining capacity in number of lines in the current chunk. + /// + /// This number starts at `chunk_size` and decreases as lines are + /// written. Once it reaches zero, a writer for a new chunk is + /// initialized and this number gets reset to `chunk_size`. + num_lines_remaining_in_current_chunk: usize, + + /// The underlying writer for the current chunk. + /// + /// Once the number of lines written to this writer exceeds + /// `chunk_size`, a new writer is initialized and assigned to this + /// field. + inner: BufWriter>, + + /// Iterator that yields filenames for each chunk. + filename_iterator: FilenameIterator<'a>, +} + +impl<'a> LineChunkWriter<'a> { + fn new(chunk_size: usize, settings: &'a Settings) -> Option> { + let mut filename_iterator = FilenameIterator::new( + &settings.prefix, + &settings.additional_suffix, + settings.suffix_length, + settings.numeric_suffix, + ); + let filename = filename_iterator.next()?; + if settings.verbose { + println!("creating file {}", filename.quote()); + } + let inner = platform::instantiate_current_writer(&settings.filter, &filename); + Some(LineChunkWriter { + settings, + chunk_size, + num_lines_remaining_in_current_chunk: chunk_size, + num_chunks_written: 0, + inner, + filename_iterator, + }) + } +} + +impl<'a> Write for LineChunkWriter<'a> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + // If the number of lines in `buf` exceeds the number of lines + // remaining in the current chunk, we will need to write to + // multiple different underlying writers. In that case, each + // iteration of this loop writes to the underlying writer that + // corresponds to the current chunk number. + let mut prev = 0; + let mut total_bytes_written = 0; + for i in memchr::memchr_iter(b'\n', buf) { + // If we have exceeded the number of lines to write in the + // current chunk, then start a new chunk and its + // corresponding writer. + if self.num_lines_remaining_in_current_chunk == 0 { + self.num_chunks_written += 1; + let filename = self.filename_iterator.next().ok_or_else(|| { + std::io::Error::new(ErrorKind::Other, "output file suffixes exhausted") + })?; + if self.settings.verbose { + println!("creating file {}", filename.quote()); + } + self.inner = platform::instantiate_current_writer(&self.settings.filter, &filename); + self.num_lines_remaining_in_current_chunk = self.chunk_size; + } + + // Write the line, starting from *after* the previous + // newline character and ending *after* the current + // newline character. + let n = self.inner.write(&buf[prev..i + 1])?; + total_bytes_written += n; + prev = i + 1; + self.num_lines_remaining_in_current_chunk -= 1; + } + + let n = self.inner.write(&buf[prev..buf.len()])?; + total_bytes_written += n; + Ok(total_bytes_written) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + trait Splitter { // Consume as much as possible from `reader` so as to saturate `writer`. // Equivalent to finishing one of the part files. Returns the number of