mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-28 03:27:44 +00:00
dd: buffer partial blocks in the output writer
Add buffering of partial blocks in the output block writer until they are completed.
This commit is contained in:
parent
f343b7e964
commit
ceccd2ecc6
3 changed files with 194 additions and 30 deletions
|
@ -46,10 +46,8 @@ impl<'a> BufferedOutput<'a> {
|
|||
/// Flush the partial block stored in the internal buffer.
|
||||
pub(crate) fn flush(&mut self) -> std::io::Result<WriteStat> {
|
||||
let wstat = self.inner.write_blocks(&self.buf)?;
|
||||
let n = wstat.bytes_total;
|
||||
for _ in 0..n {
|
||||
self.buf.remove(0);
|
||||
}
|
||||
let n = wstat.bytes_total.try_into().unwrap();
|
||||
self.buf.drain(0..n);
|
||||
Ok(wstat)
|
||||
}
|
||||
|
||||
|
@ -70,8 +68,19 @@ impl<'a> BufferedOutput<'a> {
|
|||
/// block. The returned [`WriteStat`] object will include the
|
||||
/// number of blocks written during execution of this function.
|
||||
pub(crate) fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result<WriteStat> {
|
||||
// Concatenate the old partial block with the new incoming bytes.
|
||||
let towrite = [&self.buf, buf].concat();
|
||||
// Split the incoming buffer into two parts: the bytes to write
|
||||
// and the bytes to buffer for next time.
|
||||
//
|
||||
// If `buf` does not include enough bytes to form a full block,
|
||||
// just buffer the whole thing and write zero blocks.
|
||||
let n = self.buf.len() + buf.len();
|
||||
let rem = n % self.inner.settings.obs;
|
||||
let i = buf.len().saturating_sub(rem);
|
||||
let (to_write, to_buffer) = buf.split_at(i);
|
||||
|
||||
// Concatenate the old partial block with the new bytes to form
|
||||
// some number of complete blocks.
|
||||
self.buf.extend_from_slice(to_write);
|
||||
|
||||
// Write all complete blocks to the inner block writer.
|
||||
//
|
||||
|
@ -79,19 +88,15 @@ impl<'a> BufferedOutput<'a> {
|
|||
// partial block were `b"ab"` and the new incoming bytes were
|
||||
// `b"cdefg"`, then we would write blocks `b"abc"` and
|
||||
// b`"def"` to the inner block writer.
|
||||
let n = towrite.len();
|
||||
let rem = n % self.inner.settings.obs;
|
||||
let wstat = self.inner.write_blocks(&towrite[..n - rem])?;
|
||||
self.buf.clear();
|
||||
let wstat = self.inner.write_blocks(&self.buf)?;
|
||||
|
||||
// Buffer any remaining bytes as a partial block.
|
||||
//
|
||||
// Continuing the example above, the last byte `b"g"` would be
|
||||
// buffered as a partial block until the next call to
|
||||
// `write_blocks()`.
|
||||
for byte in &towrite[n - rem..] {
|
||||
self.buf.push(*byte);
|
||||
}
|
||||
self.buf.clear();
|
||||
self.buf.extend_from_slice(to_buffer);
|
||||
|
||||
Ok(wstat)
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ mod numbers;
|
|||
mod parseargs;
|
||||
mod progress;
|
||||
|
||||
use crate::bufferedoutput::BufferedOutput;
|
||||
use blocks::conv_block_unblock_helper;
|
||||
use datastructures::*;
|
||||
use parseargs::Parser;
|
||||
|
@ -801,6 +802,68 @@ impl<'a> Output<'a> {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Truncate the underlying file to the current stream position, if possible.
|
||||
fn truncate(&mut self) -> std::io::Result<()> {
|
||||
self.dst.truncate()
|
||||
}
|
||||
}
|
||||
|
||||
/// The block writer either with or without partial block buffering.
|
||||
enum BlockWriter<'a> {
|
||||
/// Block writer with partial block buffering.
|
||||
///
|
||||
/// Partial blocks are buffered until completed.
|
||||
Buffered(BufferedOutput<'a>),
|
||||
|
||||
/// Block writer without partial block buffering.
|
||||
///
|
||||
/// Partial blocks are written immediately.
|
||||
Unbuffered(Output<'a>),
|
||||
}
|
||||
|
||||
impl<'a> BlockWriter<'a> {
|
||||
fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) {
|
||||
match self {
|
||||
Self::Unbuffered(o) => o.discard_cache(offset, len),
|
||||
Self::Buffered(o) => o.discard_cache(offset, len),
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<WriteStat> {
|
||||
match self {
|
||||
Self::Unbuffered(_) => Ok(WriteStat::default()),
|
||||
Self::Buffered(o) => o.flush(),
|
||||
}
|
||||
}
|
||||
|
||||
fn sync(&mut self) -> io::Result<()> {
|
||||
match self {
|
||||
Self::Unbuffered(o) => o.sync(),
|
||||
Self::Buffered(o) => o.sync(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Truncate the file to the final cursor location.
|
||||
fn truncate(&mut self) {
|
||||
// Calling `set_len()` may result in an error (for example,
|
||||
// when calling it on `/dev/null`), but we don't want to
|
||||
// terminate the process when that happens. Instead, we
|
||||
// suppress the error by calling `Result::ok()`. This matches
|
||||
// the behavior of GNU `dd` when given the command-line
|
||||
// argument `of=/dev/null`.
|
||||
match self {
|
||||
Self::Unbuffered(o) => o.truncate().ok(),
|
||||
Self::Buffered(o) => o.truncate().ok(),
|
||||
};
|
||||
}
|
||||
|
||||
fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result<WriteStat> {
|
||||
match self {
|
||||
Self::Unbuffered(o) => o.write_blocks(buf),
|
||||
Self::Buffered(o) => o.write_blocks(buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Copy the given input data to this output, consuming both.
|
||||
|
@ -814,7 +877,7 @@ impl<'a> Output<'a> {
|
|||
///
|
||||
/// If there is a problem reading from the input or writing to
|
||||
/// this output.
|
||||
fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||
fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
|
||||
// The read and write statistics.
|
||||
//
|
||||
// These objects are counters, initialized to zero. After each
|
||||
|
@ -851,6 +914,9 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
|||
let (prog_tx, rx) = mpsc::channel();
|
||||
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
|
||||
|
||||
// Whether to truncate the output file after all blocks have been written.
|
||||
let truncate = !o.settings.oconv.notrunc;
|
||||
|
||||
// Optimization: if no blocks are to be written, then don't
|
||||
// bother allocating any buffers.
|
||||
if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count {
|
||||
|
@ -875,7 +941,15 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
|||
let len = o.dst.len()?.try_into().unwrap();
|
||||
o.discard_cache(offset, len);
|
||||
}
|
||||
return finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread);
|
||||
return finalize(
|
||||
BlockWriter::Unbuffered(o),
|
||||
rstat,
|
||||
wstat,
|
||||
start,
|
||||
&prog_tx,
|
||||
output_thread,
|
||||
truncate,
|
||||
);
|
||||
};
|
||||
|
||||
// Create a common buffer with a capacity of the block size.
|
||||
|
@ -895,6 +969,16 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
|||
let mut read_offset = 0;
|
||||
let mut write_offset = 0;
|
||||
|
||||
let input_nocache = i.settings.iflags.nocache;
|
||||
let output_nocache = o.settings.oflags.nocache;
|
||||
|
||||
// Add partial block buffering, if needed.
|
||||
let mut o = if o.settings.buffered {
|
||||
BlockWriter::Buffered(BufferedOutput::new(o))
|
||||
} else {
|
||||
BlockWriter::Unbuffered(o)
|
||||
};
|
||||
|
||||
// The main read/write loop.
|
||||
//
|
||||
// Each iteration reads blocks from the input and writes
|
||||
|
@ -919,7 +1003,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
|||
//
|
||||
// TODO Better error handling for overflowing `offset` and `len`.
|
||||
let read_len = rstat_update.bytes_total;
|
||||
if i.settings.iflags.nocache {
|
||||
if input_nocache {
|
||||
let offset = read_offset.try_into().unwrap();
|
||||
let len = read_len.try_into().unwrap();
|
||||
i.discard_cache(offset, len);
|
||||
|
@ -931,7 +1015,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
|||
//
|
||||
// TODO Better error handling for overflowing `offset` and `len`.
|
||||
let write_len = wstat_update.bytes_total;
|
||||
if o.settings.oflags.nocache {
|
||||
if output_nocache {
|
||||
let offset = write_offset.try_into().unwrap();
|
||||
let len = write_len.try_into().unwrap();
|
||||
o.discard_cache(offset, len);
|
||||
|
@ -951,34 +1035,33 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
|||
prog_tx.send(prog_update).unwrap_or(());
|
||||
}
|
||||
}
|
||||
finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread)
|
||||
finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate)
|
||||
}
|
||||
|
||||
/// Flush output, print final stats, and join with the progress thread.
|
||||
fn finalize<T>(
|
||||
output: &mut Output,
|
||||
mut output: BlockWriter,
|
||||
rstat: ReadStat,
|
||||
wstat: WriteStat,
|
||||
start: Instant,
|
||||
prog_tx: &mpsc::Sender<ProgUpdate>,
|
||||
output_thread: thread::JoinHandle<T>,
|
||||
truncate: bool,
|
||||
) -> std::io::Result<()> {
|
||||
// Flush the output, if configured to do so.
|
||||
// Flush the output in case a partial write has been buffered but
|
||||
// not yet written.
|
||||
let wstat_update = output.flush()?;
|
||||
|
||||
// Sync the output, if configured to do so.
|
||||
output.sync()?;
|
||||
|
||||
// Truncate the file to the final cursor location.
|
||||
//
|
||||
// Calling `set_len()` may result in an error (for example,
|
||||
// when calling it on `/dev/null`), but we don't want to
|
||||
// terminate the process when that happens. Instead, we
|
||||
// suppress the error by calling `Result::ok()`. This matches
|
||||
// the behavior of GNU `dd` when given the command-line
|
||||
// argument `of=/dev/null`.
|
||||
if !output.settings.oconv.notrunc {
|
||||
output.dst.truncate().ok();
|
||||
if truncate {
|
||||
output.truncate();
|
||||
}
|
||||
|
||||
// Print the final read/write statistics.
|
||||
let wstat = wstat + wstat_update;
|
||||
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
|
||||
prog_tx.send(prog_update).unwrap_or(());
|
||||
// Wait for the output thread to finish
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//
|
||||
// For the full copyright and license information, please view the LICENSE
|
||||
// file that was distributed with this source code.
|
||||
// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg
|
||||
// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg fifoname
|
||||
|
||||
#[cfg(unix)]
|
||||
use crate::common::util::run_ucmd_as_root_with_stdin_stdout;
|
||||
|
@ -15,6 +15,8 @@ use regex::Regex;
|
|||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{BufReader, Read, Write};
|
||||
use std::path::PathBuf;
|
||||
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
|
||||
use std::process::{Command, Stdio};
|
||||
#[cfg(not(windows))]
|
||||
use std::thread::sleep;
|
||||
#[cfg(not(windows))]
|
||||
|
@ -1582,3 +1584,77 @@ fn test_seek_past_dev() {
|
|||
print!("TEST SKIPPED");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
|
||||
fn test_reading_partial_blocks_from_fifo() {
|
||||
// Create the FIFO.
|
||||
let ts = TestScenario::new(util_name!());
|
||||
let at = ts.fixtures.clone();
|
||||
at.mkfifo("fifo");
|
||||
let fifoname = at.plus_as_string("fifo");
|
||||
|
||||
// Start a `dd` process that reads from the fifo (so it will wait
|
||||
// until the writer process starts).
|
||||
let mut reader_command = Command::new(TESTS_BINARY);
|
||||
let child = reader_command
|
||||
.args(["dd", "ibs=3", "obs=3", &format!("if={}", fifoname)])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
// Start different processes to write to the FIFO, with a small
|
||||
// pause in between.
|
||||
let mut writer_command = Command::new("sh");
|
||||
writer_command
|
||||
.args([
|
||||
"-c",
|
||||
&format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname),
|
||||
])
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
let output = child.wait_with_output().unwrap();
|
||||
assert_eq!(output.stdout, b"abcd");
|
||||
let expected = b"0+2 records in\n1+1 records out\n4 bytes copied";
|
||||
assert!(output.stderr.starts_with(expected));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
|
||||
fn test_reading_partial_blocks_from_fifo_unbuffered() {
|
||||
// Create the FIFO.
|
||||
let ts = TestScenario::new(util_name!());
|
||||
let at = ts.fixtures.clone();
|
||||
at.mkfifo("fifo");
|
||||
let fifoname = at.plus_as_string("fifo");
|
||||
|
||||
// Start a `dd` process that reads from the fifo (so it will wait
|
||||
// until the writer process starts).
|
||||
//
|
||||
// `bs=N` takes precedence over `ibs=N` and `obs=N`.
|
||||
let mut reader_command = Command::new(TESTS_BINARY);
|
||||
let child = reader_command
|
||||
.args(["dd", "bs=3", "ibs=1", "obs=1", &format!("if={}", fifoname)])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
// Start different processes to write to the FIFO, with a small
|
||||
// pause in between.
|
||||
let mut writer_command = Command::new("sh");
|
||||
writer_command
|
||||
.args([
|
||||
"-c",
|
||||
&format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname),
|
||||
])
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
let output = child.wait_with_output().unwrap();
|
||||
assert_eq!(output.stdout, b"abcd");
|
||||
let expected = b"0+2 records in\n0+2 records out\n4 bytes copied";
|
||||
assert!(output.stderr.starts_with(expected));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue