mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-28 11:37:44 +00:00
Merge pull request #4545 from jfinkels/dd-reblock
dd: buffer partial blocks in the output writer
This commit is contained in:
commit
88d63be3a0
6 changed files with 457 additions and 75 deletions
206
src/uu/dd/src/bufferedoutput.rs
Normal file
206
src/uu/dd/src/bufferedoutput.rs
Normal file
|
@ -0,0 +1,206 @@
|
||||||
|
// This file is part of the uutils coreutils package.
|
||||||
|
//
|
||||||
|
// For the full copyright and license information, please view the LICENSE
|
||||||
|
// file that was distributed with this source code.
|
||||||
|
//
|
||||||
|
// spell-checker:ignore wstat towrite cdefg bufferedoutput
|
||||||
|
//! Buffer partial output blocks until they are completed.
|
||||||
|
//!
|
||||||
|
//! Use the [`BufferedOutput`] struct to create a buffered form of the
|
||||||
|
//! [`Output`] writer.
|
||||||
|
use crate::{Output, WriteStat};
|
||||||
|
|
||||||
|
/// Buffer partial output blocks until they are completed.
|
||||||
|
///
|
||||||
|
/// Complete blocks are written immediately to the inner [`Output`],
|
||||||
|
/// but partial blocks are stored in an internal buffer until they are
|
||||||
|
/// completed.
|
||||||
|
pub(crate) struct BufferedOutput<'a> {
|
||||||
|
/// The unbuffered inner block writer.
|
||||||
|
inner: Output<'a>,
|
||||||
|
|
||||||
|
/// The internal buffer that stores a partial block.
|
||||||
|
///
|
||||||
|
/// The size of this buffer is always less than the output block
|
||||||
|
/// size (that is, the value of the `obs` command-line option).
|
||||||
|
buf: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> BufferedOutput<'a> {
|
||||||
|
/// Add partial block buffering to the given block writer.
|
||||||
|
///
|
||||||
|
/// The internal buffer size is at most the value of `obs` as
|
||||||
|
/// defined in `inner`.
|
||||||
|
pub(crate) fn new(inner: Output<'a>) -> Self {
|
||||||
|
let obs = inner.settings.obs;
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
buf: Vec::with_capacity(obs),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) {
|
||||||
|
self.inner.discard_cache(offset, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Flush the partial block stored in the internal buffer.
|
||||||
|
pub(crate) fn flush(&mut self) -> std::io::Result<WriteStat> {
|
||||||
|
let wstat = self.inner.write_blocks(&self.buf)?;
|
||||||
|
let n = wstat.bytes_total.try_into().unwrap();
|
||||||
|
self.buf.drain(0..n);
|
||||||
|
Ok(wstat)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Synchronize the inner block writer.
|
||||||
|
pub(crate) fn sync(&mut self) -> std::io::Result<()> {
|
||||||
|
self.inner.sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Truncate the underlying file to the current stream position, if possible.
|
||||||
|
pub(crate) fn truncate(&mut self) -> std::io::Result<()> {
|
||||||
|
self.inner.dst.truncate()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write the given bytes one block at a time.
|
||||||
|
///
|
||||||
|
/// Only complete blocks will be written. Partial blocks will be
|
||||||
|
/// buffered until enough bytes have been provided to complete a
|
||||||
|
/// block. The returned [`WriteStat`] object will include the
|
||||||
|
/// number of blocks written during execution of this function.
|
||||||
|
pub(crate) fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result<WriteStat> {
|
||||||
|
// Split the incoming buffer into two parts: the bytes to write
|
||||||
|
// and the bytes to buffer for next time.
|
||||||
|
//
|
||||||
|
// If `buf` does not include enough bytes to form a full block,
|
||||||
|
// just buffer the whole thing and write zero blocks.
|
||||||
|
let n = self.buf.len() + buf.len();
|
||||||
|
let rem = n % self.inner.settings.obs;
|
||||||
|
let i = buf.len().saturating_sub(rem);
|
||||||
|
let (to_write, to_buffer) = buf.split_at(i);
|
||||||
|
|
||||||
|
// Concatenate the old partial block with the new bytes to form
|
||||||
|
// some number of complete blocks.
|
||||||
|
self.buf.extend_from_slice(to_write);
|
||||||
|
|
||||||
|
// Write all complete blocks to the inner block writer.
|
||||||
|
//
|
||||||
|
// For example, if the output block size were 3, the buffered
|
||||||
|
// partial block were `b"ab"` and the new incoming bytes were
|
||||||
|
// `b"cdefg"`, then we would write blocks `b"abc"` and
|
||||||
|
// b`"def"` to the inner block writer.
|
||||||
|
let wstat = self.inner.write_blocks(&self.buf)?;
|
||||||
|
|
||||||
|
// Buffer any remaining bytes as a partial block.
|
||||||
|
//
|
||||||
|
// Continuing the example above, the last byte `b"g"` would be
|
||||||
|
// buffered as a partial block until the next call to
|
||||||
|
// `write_blocks()`.
|
||||||
|
self.buf.clear();
|
||||||
|
self.buf.extend_from_slice(to_buffer);
|
||||||
|
|
||||||
|
Ok(wstat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::bufferedoutput::BufferedOutput;
|
||||||
|
use crate::{Dest, Output, Settings};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_buffered_output_write_blocks_empty() {
|
||||||
|
let settings = Settings {
|
||||||
|
obs: 3,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let inner = Output {
|
||||||
|
dst: Dest::Sink,
|
||||||
|
settings: &settings,
|
||||||
|
};
|
||||||
|
let mut output = BufferedOutput::new(inner);
|
||||||
|
let wstat = output.write_blocks(&[]).unwrap();
|
||||||
|
assert_eq!(wstat.writes_complete, 0);
|
||||||
|
assert_eq!(wstat.writes_partial, 0);
|
||||||
|
assert_eq!(wstat.bytes_total, 0);
|
||||||
|
assert_eq!(output.buf, vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_buffered_output_write_blocks_partial() {
|
||||||
|
let settings = Settings {
|
||||||
|
obs: 3,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let inner = Output {
|
||||||
|
dst: Dest::Sink,
|
||||||
|
settings: &settings,
|
||||||
|
};
|
||||||
|
let mut output = BufferedOutput::new(inner);
|
||||||
|
let wstat = output.write_blocks(b"ab").unwrap();
|
||||||
|
assert_eq!(wstat.writes_complete, 0);
|
||||||
|
assert_eq!(wstat.writes_partial, 0);
|
||||||
|
assert_eq!(wstat.bytes_total, 0);
|
||||||
|
assert_eq!(output.buf, b"ab");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_buffered_output_write_blocks_complete() {
|
||||||
|
let settings = Settings {
|
||||||
|
obs: 3,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let inner = Output {
|
||||||
|
dst: Dest::Sink,
|
||||||
|
settings: &settings,
|
||||||
|
};
|
||||||
|
let mut output = BufferedOutput::new(inner);
|
||||||
|
let wstat = output.write_blocks(b"abcd").unwrap();
|
||||||
|
assert_eq!(wstat.writes_complete, 1);
|
||||||
|
assert_eq!(wstat.writes_partial, 0);
|
||||||
|
assert_eq!(wstat.bytes_total, 3);
|
||||||
|
assert_eq!(output.buf, b"d");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_buffered_output_write_blocks_append() {
|
||||||
|
let settings = Settings {
|
||||||
|
obs: 3,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let inner = Output {
|
||||||
|
dst: Dest::Sink,
|
||||||
|
settings: &settings,
|
||||||
|
};
|
||||||
|
let mut output = BufferedOutput {
|
||||||
|
inner,
|
||||||
|
buf: b"ab".to_vec(),
|
||||||
|
};
|
||||||
|
let wstat = output.write_blocks(b"cdefg").unwrap();
|
||||||
|
assert_eq!(wstat.writes_complete, 2);
|
||||||
|
assert_eq!(wstat.writes_partial, 0);
|
||||||
|
assert_eq!(wstat.bytes_total, 6);
|
||||||
|
assert_eq!(output.buf, b"g");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_buffered_output_flush() {
|
||||||
|
let settings = Settings {
|
||||||
|
obs: 10,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let inner = Output {
|
||||||
|
dst: Dest::Sink,
|
||||||
|
settings: &settings,
|
||||||
|
};
|
||||||
|
let mut output = BufferedOutput {
|
||||||
|
inner,
|
||||||
|
buf: b"abc".to_vec(),
|
||||||
|
};
|
||||||
|
let wstat = output.flush().unwrap();
|
||||||
|
assert_eq!(wstat.writes_complete, 0);
|
||||||
|
assert_eq!(wstat.writes_partial, 1);
|
||||||
|
assert_eq!(wstat.bytes_total, 3);
|
||||||
|
assert_eq!(output.buf, vec![]);
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,23 +3,21 @@
|
||||||
// For the full copyright and license information, please view the LICENSE
|
// For the full copyright and license information, please view the LICENSE
|
||||||
// file that was distributed with this source code.
|
// file that was distributed with this source code.
|
||||||
|
|
||||||
// spell-checker:ignore fname, ftype, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, behaviour, bmax, bremain, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rremain, rsofar, rstat, sigusr, wlen, wstat seekable oconv canonicalized fadvise Fadvise FADV DONTNEED ESPIPE
|
// spell-checker:ignore fname, ftype, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, behaviour, bmax, bremain, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rremain, rsofar, rstat, sigusr, wlen, wstat seekable oconv canonicalized fadvise Fadvise FADV DONTNEED ESPIPE bufferedoutput
|
||||||
|
|
||||||
mod datastructures;
|
|
||||||
use datastructures::*;
|
|
||||||
|
|
||||||
mod parseargs;
|
|
||||||
use parseargs::Parser;
|
|
||||||
|
|
||||||
mod conversion_tables;
|
|
||||||
|
|
||||||
mod progress;
|
|
||||||
use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat};
|
|
||||||
|
|
||||||
mod blocks;
|
mod blocks;
|
||||||
use blocks::conv_block_unblock_helper;
|
mod bufferedoutput;
|
||||||
|
mod conversion_tables;
|
||||||
|
mod datastructures;
|
||||||
mod numbers;
|
mod numbers;
|
||||||
|
mod parseargs;
|
||||||
|
mod progress;
|
||||||
|
|
||||||
|
use crate::bufferedoutput::BufferedOutput;
|
||||||
|
use blocks::conv_block_unblock_helper;
|
||||||
|
use datastructures::*;
|
||||||
|
use parseargs::Parser;
|
||||||
|
use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat};
|
||||||
|
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::env;
|
use std::env;
|
||||||
|
@ -76,6 +74,8 @@ struct Settings {
|
||||||
oconv: OConvFlags,
|
oconv: OConvFlags,
|
||||||
oflags: OFlags,
|
oflags: OFlags,
|
||||||
status: Option<StatusLevel>,
|
status: Option<StatusLevel>,
|
||||||
|
/// Whether the output writer should buffer partial blocks until complete.
|
||||||
|
buffered: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A timer which triggers on a given interval
|
/// A timer which triggers on a given interval
|
||||||
|
@ -128,6 +128,12 @@ enum Num {
|
||||||
Bytes(u64),
|
Bytes(u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for Num {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Blocks(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Num {
|
impl Num {
|
||||||
fn force_bytes_if(self, force: bool) -> Self {
|
fn force_bytes_if(self, force: bool) -> Self {
|
||||||
match self {
|
match self {
|
||||||
|
@ -796,6 +802,68 @@ impl<'a> Output<'a> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Truncate the underlying file to the current stream position, if possible.
|
||||||
|
fn truncate(&mut self) -> std::io::Result<()> {
|
||||||
|
self.dst.truncate()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The block writer either with or without partial block buffering.
|
||||||
|
enum BlockWriter<'a> {
|
||||||
|
/// Block writer with partial block buffering.
|
||||||
|
///
|
||||||
|
/// Partial blocks are buffered until completed.
|
||||||
|
Buffered(BufferedOutput<'a>),
|
||||||
|
|
||||||
|
/// Block writer without partial block buffering.
|
||||||
|
///
|
||||||
|
/// Partial blocks are written immediately.
|
||||||
|
Unbuffered(Output<'a>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> BlockWriter<'a> {
|
||||||
|
fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) {
|
||||||
|
match self {
|
||||||
|
Self::Unbuffered(o) => o.discard_cache(offset, len),
|
||||||
|
Self::Buffered(o) => o.discard_cache(offset, len),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> io::Result<WriteStat> {
|
||||||
|
match self {
|
||||||
|
Self::Unbuffered(_) => Ok(WriteStat::default()),
|
||||||
|
Self::Buffered(o) => o.flush(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync(&mut self) -> io::Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::Unbuffered(o) => o.sync(),
|
||||||
|
Self::Buffered(o) => o.sync(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Truncate the file to the final cursor location.
|
||||||
|
fn truncate(&mut self) {
|
||||||
|
// Calling `set_len()` may result in an error (for example,
|
||||||
|
// when calling it on `/dev/null`), but we don't want to
|
||||||
|
// terminate the process when that happens. Instead, we
|
||||||
|
// suppress the error by calling `Result::ok()`. This matches
|
||||||
|
// the behavior of GNU `dd` when given the command-line
|
||||||
|
// argument `of=/dev/null`.
|
||||||
|
match self {
|
||||||
|
Self::Unbuffered(o) => o.truncate().ok(),
|
||||||
|
Self::Buffered(o) => o.truncate().ok(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result<WriteStat> {
|
||||||
|
match self {
|
||||||
|
Self::Unbuffered(o) => o.write_blocks(buf),
|
||||||
|
Self::Buffered(o) => o.write_blocks(buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Copy the given input data to this output, consuming both.
|
/// Copy the given input data to this output, consuming both.
|
||||||
|
@ -809,7 +877,7 @@ impl<'a> Output<'a> {
|
||||||
///
|
///
|
||||||
/// If there is a problem reading from the input or writing to
|
/// If there is a problem reading from the input or writing to
|
||||||
/// this output.
|
/// this output.
|
||||||
fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
|
||||||
// The read and write statistics.
|
// The read and write statistics.
|
||||||
//
|
//
|
||||||
// These objects are counters, initialized to zero. After each
|
// These objects are counters, initialized to zero. After each
|
||||||
|
@ -846,6 +914,9 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||||
let (prog_tx, rx) = mpsc::channel();
|
let (prog_tx, rx) = mpsc::channel();
|
||||||
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
|
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
|
||||||
|
|
||||||
|
// Whether to truncate the output file after all blocks have been written.
|
||||||
|
let truncate = !o.settings.oconv.notrunc;
|
||||||
|
|
||||||
// Optimization: if no blocks are to be written, then don't
|
// Optimization: if no blocks are to be written, then don't
|
||||||
// bother allocating any buffers.
|
// bother allocating any buffers.
|
||||||
if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count {
|
if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count {
|
||||||
|
@ -870,7 +941,15 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||||
let len = o.dst.len()?.try_into().unwrap();
|
let len = o.dst.len()?.try_into().unwrap();
|
||||||
o.discard_cache(offset, len);
|
o.discard_cache(offset, len);
|
||||||
}
|
}
|
||||||
return finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread);
|
return finalize(
|
||||||
|
BlockWriter::Unbuffered(o),
|
||||||
|
rstat,
|
||||||
|
wstat,
|
||||||
|
start,
|
||||||
|
&prog_tx,
|
||||||
|
output_thread,
|
||||||
|
truncate,
|
||||||
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create a common buffer with a capacity of the block size.
|
// Create a common buffer with a capacity of the block size.
|
||||||
|
@ -890,13 +969,23 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||||
let mut read_offset = 0;
|
let mut read_offset = 0;
|
||||||
let mut write_offset = 0;
|
let mut write_offset = 0;
|
||||||
|
|
||||||
|
let input_nocache = i.settings.iflags.nocache;
|
||||||
|
let output_nocache = o.settings.oflags.nocache;
|
||||||
|
|
||||||
|
// Add partial block buffering, if needed.
|
||||||
|
let mut o = if o.settings.buffered {
|
||||||
|
BlockWriter::Buffered(BufferedOutput::new(o))
|
||||||
|
} else {
|
||||||
|
BlockWriter::Unbuffered(o)
|
||||||
|
};
|
||||||
|
|
||||||
// The main read/write loop.
|
// The main read/write loop.
|
||||||
//
|
//
|
||||||
// Each iteration reads blocks from the input and writes
|
// Each iteration reads blocks from the input and writes
|
||||||
// blocks to this output. Read/write statistics are updated on
|
// blocks to this output. Read/write statistics are updated on
|
||||||
// each iteration and cumulative statistics are reported to
|
// each iteration and cumulative statistics are reported to
|
||||||
// the progress reporting thread.
|
// the progress reporting thread.
|
||||||
while below_count_limit(&i.settings.count, &rstat, &wstat) {
|
while below_count_limit(&i.settings.count, &rstat) {
|
||||||
// Read a block from the input then write the block to the output.
|
// Read a block from the input then write the block to the output.
|
||||||
//
|
//
|
||||||
// As an optimization, make an educated guess about the
|
// As an optimization, make an educated guess about the
|
||||||
|
@ -914,7 +1003,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||||
//
|
//
|
||||||
// TODO Better error handling for overflowing `offset` and `len`.
|
// TODO Better error handling for overflowing `offset` and `len`.
|
||||||
let read_len = rstat_update.bytes_total;
|
let read_len = rstat_update.bytes_total;
|
||||||
if i.settings.iflags.nocache {
|
if input_nocache {
|
||||||
let offset = read_offset.try_into().unwrap();
|
let offset = read_offset.try_into().unwrap();
|
||||||
let len = read_len.try_into().unwrap();
|
let len = read_len.try_into().unwrap();
|
||||||
i.discard_cache(offset, len);
|
i.discard_cache(offset, len);
|
||||||
|
@ -926,7 +1015,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||||
//
|
//
|
||||||
// TODO Better error handling for overflowing `offset` and `len`.
|
// TODO Better error handling for overflowing `offset` and `len`.
|
||||||
let write_len = wstat_update.bytes_total;
|
let write_len = wstat_update.bytes_total;
|
||||||
if o.settings.oflags.nocache {
|
if output_nocache {
|
||||||
let offset = write_offset.try_into().unwrap();
|
let offset = write_offset.try_into().unwrap();
|
||||||
let len = write_len.try_into().unwrap();
|
let len = write_len.try_into().unwrap();
|
||||||
o.discard_cache(offset, len);
|
o.discard_cache(offset, len);
|
||||||
|
@ -946,34 +1035,33 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
|
||||||
prog_tx.send(prog_update).unwrap_or(());
|
prog_tx.send(prog_update).unwrap_or(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread)
|
finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Flush output, print final stats, and join with the progress thread.
|
/// Flush output, print final stats, and join with the progress thread.
|
||||||
fn finalize<T>(
|
fn finalize<T>(
|
||||||
output: &mut Output,
|
mut output: BlockWriter,
|
||||||
rstat: ReadStat,
|
rstat: ReadStat,
|
||||||
wstat: WriteStat,
|
wstat: WriteStat,
|
||||||
start: Instant,
|
start: Instant,
|
||||||
prog_tx: &mpsc::Sender<ProgUpdate>,
|
prog_tx: &mpsc::Sender<ProgUpdate>,
|
||||||
output_thread: thread::JoinHandle<T>,
|
output_thread: thread::JoinHandle<T>,
|
||||||
|
truncate: bool,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
// Flush the output, if configured to do so.
|
// Flush the output in case a partial write has been buffered but
|
||||||
|
// not yet written.
|
||||||
|
let wstat_update = output.flush()?;
|
||||||
|
|
||||||
|
// Sync the output, if configured to do so.
|
||||||
output.sync()?;
|
output.sync()?;
|
||||||
|
|
||||||
// Truncate the file to the final cursor location.
|
// Truncate the file to the final cursor location.
|
||||||
//
|
if truncate {
|
||||||
// Calling `set_len()` may result in an error (for example,
|
output.truncate();
|
||||||
// when calling it on `/dev/null`), but we don't want to
|
|
||||||
// terminate the process when that happens. Instead, we
|
|
||||||
// suppress the error by calling `Result::ok()`. This matches
|
|
||||||
// the behavior of GNU `dd` when given the command-line
|
|
||||||
// argument `of=/dev/null`.
|
|
||||||
if !output.settings.oconv.notrunc {
|
|
||||||
output.dst.truncate().ok();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Print the final read/write statistics.
|
// Print the final read/write statistics.
|
||||||
|
let wstat = wstat + wstat_update;
|
||||||
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
|
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
|
||||||
prog_tx.send(prog_update).unwrap_or(());
|
prog_tx.send(prog_update).unwrap_or(());
|
||||||
// Wait for the output thread to finish
|
// Wait for the output thread to finish
|
||||||
|
@ -1103,16 +1191,10 @@ fn calc_loop_bsize(
|
||||||
|
|
||||||
// Decide if the current progress is below a count=N limit or return
|
// Decide if the current progress is below a count=N limit or return
|
||||||
// true if no such limit is set.
|
// true if no such limit is set.
|
||||||
fn below_count_limit(count: &Option<Num>, rstat: &ReadStat, wstat: &WriteStat) -> bool {
|
fn below_count_limit(count: &Option<Num>, rstat: &ReadStat) -> bool {
|
||||||
match count {
|
match count {
|
||||||
Some(Num::Blocks(n)) => {
|
Some(Num::Blocks(n)) => rstat.reads_complete + rstat.reads_partial < *n,
|
||||||
let n = *n;
|
Some(Num::Bytes(n)) => rstat.bytes_total < *n,
|
||||||
rstat.reads_complete + rstat.reads_partial <= n
|
|
||||||
}
|
|
||||||
Some(Num::Bytes(n)) => {
|
|
||||||
let n = (*n).try_into().unwrap();
|
|
||||||
wstat.bytes_total <= n
|
|
||||||
}
|
|
||||||
None => true,
|
None => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,41 +35,28 @@ pub enum ParseError {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contains a temporary state during parsing of the arguments
|
/// Contains a temporary state during parsing of the arguments
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq, Default)]
|
||||||
pub struct Parser {
|
pub struct Parser {
|
||||||
infile: Option<String>,
|
infile: Option<String>,
|
||||||
outfile: Option<String>,
|
outfile: Option<String>,
|
||||||
ibs: usize,
|
/// The block size option specified on the command-line, if any.
|
||||||
obs: usize,
|
bs: Option<usize>,
|
||||||
|
/// The input block size option specified on the command-line, if any.
|
||||||
|
ibs: Option<usize>,
|
||||||
|
/// The output block size option specified on the command-line, if any.
|
||||||
|
obs: Option<usize>,
|
||||||
cbs: Option<usize>,
|
cbs: Option<usize>,
|
||||||
skip: Num,
|
skip: Num,
|
||||||
seek: Num,
|
seek: Num,
|
||||||
count: Option<Num>,
|
count: Option<Num>,
|
||||||
conv: ConvFlags,
|
conv: ConvFlags,
|
||||||
|
/// Whether a data-transforming `conv` option has been specified.
|
||||||
|
is_conv_specified: bool,
|
||||||
iflag: IFlags,
|
iflag: IFlags,
|
||||||
oflag: OFlags,
|
oflag: OFlags,
|
||||||
status: Option<StatusLevel>,
|
status: Option<StatusLevel>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Parser {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
ibs: 512,
|
|
||||||
obs: 512,
|
|
||||||
cbs: None,
|
|
||||||
infile: None,
|
|
||||||
outfile: None,
|
|
||||||
skip: Num::Blocks(0),
|
|
||||||
seek: Num::Blocks(0),
|
|
||||||
count: None,
|
|
||||||
conv: ConvFlags::default(),
|
|
||||||
iflag: IFlags::default(),
|
|
||||||
oflag: OFlags::default(),
|
|
||||||
status: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default, PartialEq, Eq)]
|
#[derive(Debug, Default, PartialEq, Eq)]
|
||||||
pub struct ConvFlags {
|
pub struct ConvFlags {
|
||||||
ascii: bool,
|
ascii: bool,
|
||||||
|
@ -212,15 +199,34 @@ impl Parser {
|
||||||
fsync: conv.fsync,
|
fsync: conv.fsync,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Input and output block sizes.
|
||||||
|
//
|
||||||
|
// The `bs` option takes precedence. If either is not
|
||||||
|
// provided, `ibs` and `obs` are each 512 bytes by default.
|
||||||
|
let (ibs, obs) = match self.bs {
|
||||||
|
None => (self.ibs.unwrap_or(512), self.obs.unwrap_or(512)),
|
||||||
|
Some(bs) => (bs, bs),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Whether to buffer partial output blocks until they are completed.
|
||||||
|
//
|
||||||
|
// From the GNU `dd` documentation for the `bs=BYTES` option:
|
||||||
|
//
|
||||||
|
// > [...] if no data-transforming 'conv' option is specified,
|
||||||
|
// > input is copied to the output as soon as it's read, even if
|
||||||
|
// > it is smaller than the block size.
|
||||||
|
//
|
||||||
|
let buffered = self.bs.is_none() || self.is_conv_specified;
|
||||||
|
|
||||||
let skip = self
|
let skip = self
|
||||||
.skip
|
.skip
|
||||||
.force_bytes_if(self.iflag.skip_bytes)
|
.force_bytes_if(self.iflag.skip_bytes)
|
||||||
.to_bytes(self.ibs as u64);
|
.to_bytes(ibs as u64);
|
||||||
|
|
||||||
let seek = self
|
let seek = self
|
||||||
.seek
|
.seek
|
||||||
.force_bytes_if(self.oflag.seek_bytes)
|
.force_bytes_if(self.oflag.seek_bytes)
|
||||||
.to_bytes(self.obs as u64);
|
.to_bytes(obs as u64);
|
||||||
|
|
||||||
let count = self.count.map(|c| c.force_bytes_if(self.iflag.count_bytes));
|
let count = self.count.map(|c| c.force_bytes_if(self.iflag.count_bytes));
|
||||||
|
|
||||||
|
@ -230,8 +236,9 @@ impl Parser {
|
||||||
count,
|
count,
|
||||||
iconv,
|
iconv,
|
||||||
oconv,
|
oconv,
|
||||||
ibs: self.ibs,
|
ibs,
|
||||||
obs: self.obs,
|
obs,
|
||||||
|
buffered,
|
||||||
infile: self.infile,
|
infile: self.infile,
|
||||||
outfile: self.outfile,
|
outfile: self.outfile,
|
||||||
iflags: self.iflag,
|
iflags: self.iflag,
|
||||||
|
@ -244,18 +251,17 @@ impl Parser {
|
||||||
match operand.split_once('=') {
|
match operand.split_once('=') {
|
||||||
None => return Err(ParseError::UnrecognizedOperand(operand.to_string())),
|
None => return Err(ParseError::UnrecognizedOperand(operand.to_string())),
|
||||||
Some((k, v)) => match k {
|
Some((k, v)) => match k {
|
||||||
"bs" => {
|
"bs" => self.bs = Some(Self::parse_bytes(k, v)?),
|
||||||
let bs = Self::parse_bytes(k, v)?;
|
|
||||||
self.ibs = bs;
|
|
||||||
self.obs = bs;
|
|
||||||
}
|
|
||||||
"cbs" => self.cbs = Some(Self::parse_bytes(k, v)?),
|
"cbs" => self.cbs = Some(Self::parse_bytes(k, v)?),
|
||||||
"conv" => self.parse_conv_flags(v)?,
|
"conv" => {
|
||||||
|
self.is_conv_specified = true;
|
||||||
|
self.parse_conv_flags(v)?;
|
||||||
|
}
|
||||||
"count" => self.count = Some(Self::parse_n(v)?),
|
"count" => self.count = Some(Self::parse_n(v)?),
|
||||||
"ibs" => self.ibs = Self::parse_bytes(k, v)?,
|
"ibs" => self.ibs = Some(Self::parse_bytes(k, v)?),
|
||||||
"if" => self.infile = Some(v.to_string()),
|
"if" => self.infile = Some(v.to_string()),
|
||||||
"iflag" => self.parse_input_flags(v)?,
|
"iflag" => self.parse_input_flags(v)?,
|
||||||
"obs" => self.obs = Self::parse_bytes(k, v)?,
|
"obs" => self.obs = Some(Self::parse_bytes(k, v)?),
|
||||||
"of" => self.outfile = Some(v.to_string()),
|
"of" => self.outfile = Some(v.to_string()),
|
||||||
"oflag" => self.parse_output_flags(v)?,
|
"oflag" => self.parse_output_flags(v)?,
|
||||||
"seek" | "oseek" => self.seek = Self::parse_n(v)?,
|
"seek" | "oseek" => self.seek = Self::parse_n(v)?,
|
||||||
|
|
|
@ -358,6 +358,7 @@ fn parse_icf_tokens_remaining() {
|
||||||
fsync: true,
|
fsync: true,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
|
is_conv_specified: true,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
|
@ -379,6 +379,17 @@ impl std::ops::AddAssign for WriteStat {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::ops::Add for WriteStat {
|
||||||
|
type Output = Self;
|
||||||
|
fn add(self, other: Self) -> Self {
|
||||||
|
Self {
|
||||||
|
writes_complete: self.writes_complete + other.writes_complete,
|
||||||
|
writes_partial: self.writes_partial + other.writes_partial,
|
||||||
|
bytes_total: self.bytes_total + other.bytes_total,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// How much detail to report when printing transfer statistics.
|
/// How much detail to report when printing transfer statistics.
|
||||||
///
|
///
|
||||||
/// This corresponds to the available settings of the `status`
|
/// This corresponds to the available settings of the `status`
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
//
|
//
|
||||||
// For the full copyright and license information, please view the LICENSE
|
// For the full copyright and license information, please view the LICENSE
|
||||||
// file that was distributed with this source code.
|
// file that was distributed with this source code.
|
||||||
// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg
|
// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg fifoname
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use crate::common::util::run_ucmd_as_root_with_stdin_stdout;
|
use crate::common::util::run_ucmd_as_root_with_stdin_stdout;
|
||||||
|
@ -15,6 +15,8 @@ use regex::Regex;
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::{BufReader, Read, Write};
|
use std::io::{BufReader, Read, Write};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
|
||||||
|
use std::process::{Command, Stdio};
|
||||||
#[cfg(not(windows))]
|
#[cfg(not(windows))]
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
#[cfg(not(windows))]
|
#[cfg(not(windows))]
|
||||||
|
@ -1582,3 +1584,77 @@ fn test_seek_past_dev() {
|
||||||
print!("TEST SKIPPED");
|
print!("TEST SKIPPED");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
|
||||||
|
fn test_reading_partial_blocks_from_fifo() {
|
||||||
|
// Create the FIFO.
|
||||||
|
let ts = TestScenario::new(util_name!());
|
||||||
|
let at = ts.fixtures.clone();
|
||||||
|
at.mkfifo("fifo");
|
||||||
|
let fifoname = at.plus_as_string("fifo");
|
||||||
|
|
||||||
|
// Start a `dd` process that reads from the fifo (so it will wait
|
||||||
|
// until the writer process starts).
|
||||||
|
let mut reader_command = Command::new(TESTS_BINARY);
|
||||||
|
let child = reader_command
|
||||||
|
.args(["dd", "ibs=3", "obs=3", &format!("if={}", fifoname)])
|
||||||
|
.stdout(Stdio::piped())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
|
.spawn()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Start different processes to write to the FIFO, with a small
|
||||||
|
// pause in between.
|
||||||
|
let mut writer_command = Command::new("sh");
|
||||||
|
writer_command
|
||||||
|
.args([
|
||||||
|
"-c",
|
||||||
|
&format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname),
|
||||||
|
])
|
||||||
|
.spawn()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let output = child.wait_with_output().unwrap();
|
||||||
|
assert_eq!(output.stdout, b"abcd");
|
||||||
|
let expected = b"0+2 records in\n1+1 records out\n4 bytes copied";
|
||||||
|
assert!(output.stderr.starts_with(expected));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
|
||||||
|
fn test_reading_partial_blocks_from_fifo_unbuffered() {
|
||||||
|
// Create the FIFO.
|
||||||
|
let ts = TestScenario::new(util_name!());
|
||||||
|
let at = ts.fixtures.clone();
|
||||||
|
at.mkfifo("fifo");
|
||||||
|
let fifoname = at.plus_as_string("fifo");
|
||||||
|
|
||||||
|
// Start a `dd` process that reads from the fifo (so it will wait
|
||||||
|
// until the writer process starts).
|
||||||
|
//
|
||||||
|
// `bs=N` takes precedence over `ibs=N` and `obs=N`.
|
||||||
|
let mut reader_command = Command::new(TESTS_BINARY);
|
||||||
|
let child = reader_command
|
||||||
|
.args(["dd", "bs=3", "ibs=1", "obs=1", &format!("if={}", fifoname)])
|
||||||
|
.stdout(Stdio::piped())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
|
.spawn()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Start different processes to write to the FIFO, with a small
|
||||||
|
// pause in between.
|
||||||
|
let mut writer_command = Command::new("sh");
|
||||||
|
writer_command
|
||||||
|
.args([
|
||||||
|
"-c",
|
||||||
|
&format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname),
|
||||||
|
])
|
||||||
|
.spawn()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let output = child.wait_with_output().unwrap();
|
||||||
|
assert_eq!(output.stdout, b"abcd");
|
||||||
|
let expected = b"0+2 records in\n0+2 records out\n4 bytes copied";
|
||||||
|
assert!(output.stderr.starts_with(expected));
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue