From 016ae34d50e2e6e5ec50fca6fc88ad257a4758a2 Mon Sep 17 00:00:00 2001 From: Jeffrey Finkelstein Date: Sun, 12 Mar 2023 19:02:48 -0400 Subject: [PATCH 1/5] dd: add Settings.buffered field Add the `Settings.buffered` field to indicate whether partial output blocks should be buffered until they are complete. --- src/uu/dd/src/dd.rs | 8 +++ src/uu/dd/src/parseargs.rs | 74 +++++++++++++++------------ src/uu/dd/src/parseargs/unit_tests.rs | 1 + 3 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index b79ae22da..7d9138791 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -76,6 +76,8 @@ struct Settings { oconv: OConvFlags, oflags: OFlags, status: Option, + /// Whether the output writer should buffer partial blocks until complete. + buffered: bool, } /// A timer which triggers on a given interval @@ -128,6 +130,12 @@ enum Num { Bytes(u64), } +impl Default for Num { + fn default() -> Self { + Self::Blocks(0) + } +} + impl Num { fn force_bytes_if(self, force: bool) -> Self { match self { diff --git a/src/uu/dd/src/parseargs.rs b/src/uu/dd/src/parseargs.rs index 0ff6e752c..60ce9a697 100644 --- a/src/uu/dd/src/parseargs.rs +++ b/src/uu/dd/src/parseargs.rs @@ -35,41 +35,28 @@ pub enum ParseError { } /// Contains a temporary state during parsing of the arguments -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct Parser { infile: Option, outfile: Option, - ibs: usize, - obs: usize, + /// The block size option specified on the command-line, if any. + bs: Option, + /// The input block size option specified on the command-line, if any. + ibs: Option, + /// The output block size option specified on the command-line, if any. + obs: Option, cbs: Option, skip: Num, seek: Num, count: Option, conv: ConvFlags, + /// Whether a data-transforming `conv` option has been specified. + is_conv_specified: bool, iflag: IFlags, oflag: OFlags, status: Option, } -impl Default for Parser { - fn default() -> Self { - Self { - ibs: 512, - obs: 512, - cbs: None, - infile: None, - outfile: None, - skip: Num::Blocks(0), - seek: Num::Blocks(0), - count: None, - conv: ConvFlags::default(), - iflag: IFlags::default(), - oflag: OFlags::default(), - status: None, - } - } -} - #[derive(Debug, Default, PartialEq, Eq)] pub struct ConvFlags { ascii: bool, @@ -212,15 +199,34 @@ impl Parser { fsync: conv.fsync, }; + // Input and output block sizes. + // + // The `bs` option takes precedence. If either is not + // provided, `ibs` and `obs` are each 512 bytes by default. + let (ibs, obs) = match self.bs { + None => (self.ibs.unwrap_or(512), self.obs.unwrap_or(512)), + Some(bs) => (bs, bs), + }; + + // Whether to buffer partial output blocks until they are completed. + // + // From the GNU `dd` documentation for the `bs=BYTES` option: + // + // > [...] if no data-transforming 'conv' option is specified, + // > input is copied to the output as soon as it's read, even if + // > it is smaller than the block size. + // + let buffered = self.bs.is_none() || self.is_conv_specified; + let skip = self .skip .force_bytes_if(self.iflag.skip_bytes) - .to_bytes(self.ibs as u64); + .to_bytes(ibs as u64); let seek = self .seek .force_bytes_if(self.oflag.seek_bytes) - .to_bytes(self.obs as u64); + .to_bytes(obs as u64); let count = self.count.map(|c| c.force_bytes_if(self.iflag.count_bytes)); @@ -230,8 +236,9 @@ impl Parser { count, iconv, oconv, - ibs: self.ibs, - obs: self.obs, + ibs, + obs, + buffered, infile: self.infile, outfile: self.outfile, iflags: self.iflag, @@ -244,18 +251,17 @@ impl Parser { match operand.split_once('=') { None => return Err(ParseError::UnrecognizedOperand(operand.to_string())), Some((k, v)) => match k { - "bs" => { - let bs = Self::parse_bytes(k, v)?; - self.ibs = bs; - self.obs = bs; - } + "bs" => self.bs = Some(Self::parse_bytes(k, v)?), "cbs" => self.cbs = Some(Self::parse_bytes(k, v)?), - "conv" => self.parse_conv_flags(v)?, + "conv" => { + self.is_conv_specified = true; + self.parse_conv_flags(v)?; + } "count" => self.count = Some(Self::parse_n(v)?), - "ibs" => self.ibs = Self::parse_bytes(k, v)?, + "ibs" => self.ibs = Some(Self::parse_bytes(k, v)?), "if" => self.infile = Some(v.to_string()), "iflag" => self.parse_input_flags(v)?, - "obs" => self.obs = Self::parse_bytes(k, v)?, + "obs" => self.obs = Some(Self::parse_bytes(k, v)?), "of" => self.outfile = Some(v.to_string()), "oflag" => self.parse_output_flags(v)?, "seek" | "oseek" => self.seek = Self::parse_n(v)?, diff --git a/src/uu/dd/src/parseargs/unit_tests.rs b/src/uu/dd/src/parseargs/unit_tests.rs index 142e49fd0..51b0933e9 100644 --- a/src/uu/dd/src/parseargs/unit_tests.rs +++ b/src/uu/dd/src/parseargs/unit_tests.rs @@ -358,6 +358,7 @@ fn parse_icf_tokens_remaining() { fsync: true, ..Default::default() }, + is_conv_specified: true, ..Default::default() }) ); From 5142f35f8395130dd331cc5756a0cd0466a1e74c Mon Sep 17 00:00:00 2001 From: Jeffrey Finkelstein Date: Fri, 17 Mar 2023 20:55:49 -0400 Subject: [PATCH 2/5] dd: add BufferedOutput to buffer partial blocks --- src/uu/dd/src/bufferedoutput.rs | 201 ++++++++++++++++++++++++++++++++ src/uu/dd/src/dd.rs | 25 ++-- 2 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 src/uu/dd/src/bufferedoutput.rs diff --git a/src/uu/dd/src/bufferedoutput.rs b/src/uu/dd/src/bufferedoutput.rs new file mode 100644 index 000000000..1735ae10d --- /dev/null +++ b/src/uu/dd/src/bufferedoutput.rs @@ -0,0 +1,201 @@ +// This file is part of the uutils coreutils package. +// +// For the full copyright and license information, please view the LICENSE +// file that was distributed with this source code. +// +// spell-checker:ignore wstat towrite cdefg bufferedoutput +//! Buffer partial output blocks until they are completed. +//! +//! Use the [`BufferedOutput`] struct to create a buffered form of the +//! [`Output`] writer. +use crate::{Output, WriteStat}; + +/// Buffer partial output blocks until they are completed. +/// +/// Complete blocks are written immediately to the inner [`Output`], +/// but partial blocks are stored in an internal buffer until they are +/// completed. +pub(crate) struct BufferedOutput<'a> { + /// The unbuffered inner block writer. + inner: Output<'a>, + + /// The internal buffer that stores a partial block. + /// + /// The size of this buffer is always less than the output block + /// size (that is, the value of the `obs` command-line option). + buf: Vec, +} + +impl<'a> BufferedOutput<'a> { + /// Add partial block buffering to the given block writer. + /// + /// The internal buffer size is at most the value of `obs` as + /// defined in `inner`. + pub(crate) fn new(inner: Output<'a>) -> Self { + let obs = inner.settings.obs; + Self { + inner, + buf: Vec::with_capacity(obs), + } + } + + pub(crate) fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) { + self.inner.discard_cache(offset, len); + } + + /// Flush the partial block stored in the internal buffer. + pub(crate) fn flush(&mut self) -> std::io::Result { + let wstat = self.inner.write_blocks(&self.buf)?; + let n = wstat.bytes_total; + for _ in 0..n { + self.buf.remove(0); + } + Ok(wstat) + } + + /// Synchronize the inner block writer. + pub(crate) fn sync(&mut self) -> std::io::Result<()> { + self.inner.sync() + } + + /// Truncate the underlying file to the current stream position, if possible. + pub(crate) fn truncate(&mut self) -> std::io::Result<()> { + self.inner.dst.truncate() + } + + /// Write the given bytes one block at a time. + /// + /// Only complete blocks will be written. Partial blocks will be + /// buffered until enough bytes have been provided to complete a + /// block. The returned [`WriteStat`] object will include the + /// number of blocks written during execution of this function. + pub(crate) fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result { + // Concatenate the old partial block with the new incoming bytes. + let towrite = [&self.buf, buf].concat(); + + // Write all complete blocks to the inner block writer. + // + // For example, if the output block size were 3, the buffered + // partial block were `b"ab"` and the new incoming bytes were + // `b"cdefg"`, then we would write blocks `b"abc"` and + // b`"def"` to the inner block writer. + let n = towrite.len(); + let rem = n % self.inner.settings.obs; + let wstat = self.inner.write_blocks(&towrite[..n - rem])?; + self.buf.clear(); + + // Buffer any remaining bytes as a partial block. + // + // Continuing the example above, the last byte `b"g"` would be + // buffered as a partial block until the next call to + // `write_blocks()`. + for byte in &towrite[n - rem..] { + self.buf.push(*byte); + } + + Ok(wstat) + } +} + +#[cfg(unix)] +#[cfg(test)] +mod tests { + use crate::bufferedoutput::BufferedOutput; + use crate::{Dest, Output, Settings}; + + #[test] + fn test_buffered_output_write_blocks_empty() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput::new(inner); + let wstat = output.write_blocks(&[]).unwrap(); + assert_eq!(wstat.writes_complete, 0); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 0); + assert_eq!(output.buf, vec![]); + } + + #[test] + fn test_buffered_output_write_blocks_partial() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput::new(inner); + let wstat = output.write_blocks(b"ab").unwrap(); + assert_eq!(wstat.writes_complete, 0); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 0); + assert_eq!(output.buf, b"ab"); + } + + #[test] + fn test_buffered_output_write_blocks_complete() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput::new(inner); + let wstat = output.write_blocks(b"abcd").unwrap(); + assert_eq!(wstat.writes_complete, 1); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 3); + assert_eq!(output.buf, b"d"); + } + + #[test] + fn test_buffered_output_write_blocks_append() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput { + inner, + buf: b"ab".to_vec(), + }; + let wstat = output.write_blocks(b"cdefg").unwrap(); + assert_eq!(wstat.writes_complete, 2); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 6); + assert_eq!(output.buf, b"g"); + } + + #[test] + fn test_buffered_output_flush() { + let settings = Settings { + obs: 10, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput { + inner, + buf: b"abc".to_vec(), + }; + let wstat = output.flush().unwrap(); + assert_eq!(wstat.writes_complete, 0); + assert_eq!(wstat.writes_partial, 1); + assert_eq!(wstat.bytes_total, 3); + assert_eq!(output.buf, vec![]); + } +} diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 7d9138791..9374ca0cd 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -3,23 +3,20 @@ // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore fname, ftype, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, behaviour, bmax, bremain, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rremain, rsofar, rstat, sigusr, wlen, wstat seekable oconv canonicalized fadvise Fadvise FADV DONTNEED ESPIPE - -mod datastructures; -use datastructures::*; - -mod parseargs; -use parseargs::Parser; - -mod conversion_tables; - -mod progress; -use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat}; +// spell-checker:ignore fname, ftype, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, behaviour, bmax, bremain, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rremain, rsofar, rstat, sigusr, wlen, wstat seekable oconv canonicalized fadvise Fadvise FADV DONTNEED ESPIPE bufferedoutput mod blocks; -use blocks::conv_block_unblock_helper; - +mod bufferedoutput; +mod conversion_tables; +mod datastructures; mod numbers; +mod parseargs; +mod progress; + +use blocks::conv_block_unblock_helper; +use datastructures::*; +use parseargs::Parser; +use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat}; use std::cmp; use std::env; From b383e609988a7f171643a387a81c512cab1257be Mon Sep 17 00:00:00 2001 From: Jeffrey Finkelstein Date: Fri, 17 Mar 2023 22:41:10 -0400 Subject: [PATCH 3/5] dd: implement Add for WriteStat --- src/uu/dd/src/progress.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/uu/dd/src/progress.rs b/src/uu/dd/src/progress.rs index 4fe04cb0e..ac7517c2c 100644 --- a/src/uu/dd/src/progress.rs +++ b/src/uu/dd/src/progress.rs @@ -379,6 +379,17 @@ impl std::ops::AddAssign for WriteStat { } } +impl std::ops::Add for WriteStat { + type Output = Self; + fn add(self, other: Self) -> Self { + Self { + writes_complete: self.writes_complete + other.writes_complete, + writes_partial: self.writes_partial + other.writes_partial, + bytes_total: self.bytes_total + other.bytes_total, + } + } +} + /// How much detail to report when printing transfer statistics. /// /// This corresponds to the available settings of the `status` From f343b7e964091507e9373c94da4619c962c8d23c Mon Sep 17 00:00:00 2001 From: Jeffrey Finkelstein Date: Fri, 17 Mar 2023 22:41:56 -0400 Subject: [PATCH 4/5] dd: use read statistics for termination condition Correct the behavior of `dd` so that the termination condition of the main loop uses the number of bytes read, not the number of bytes written, when the `count` command-line option is given in bytes instead of blocks. --- src/uu/dd/src/dd.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 9374ca0cd..b760d98e0 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -901,7 +901,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // blocks to this output. Read/write statistics are updated on // each iteration and cumulative statistics are reported to // the progress reporting thread. - while below_count_limit(&i.settings.count, &rstat, &wstat) { + while below_count_limit(&i.settings.count, &rstat) { // Read a block from the input then write the block to the output. // // As an optimization, make an educated guess about the @@ -1108,16 +1108,10 @@ fn calc_loop_bsize( // Decide if the current progress is below a count=N limit or return // true if no such limit is set. -fn below_count_limit(count: &Option, rstat: &ReadStat, wstat: &WriteStat) -> bool { +fn below_count_limit(count: &Option, rstat: &ReadStat) -> bool { match count { - Some(Num::Blocks(n)) => { - let n = *n; - rstat.reads_complete + rstat.reads_partial <= n - } - Some(Num::Bytes(n)) => { - let n = (*n).try_into().unwrap(); - wstat.bytes_total <= n - } + Some(Num::Blocks(n)) => rstat.reads_complete + rstat.reads_partial < *n, + Some(Num::Bytes(n)) => rstat.bytes_total < *n, None => true, } } From ceccd2ecc61ed83d9c66ac55f82913723ca4d96e Mon Sep 17 00:00:00 2001 From: Jeffrey Finkelstein Date: Fri, 17 Mar 2023 22:42:24 -0400 Subject: [PATCH 5/5] dd: buffer partial blocks in the output writer Add buffering of partial blocks in the output block writer until they are completed. --- src/uu/dd/src/bufferedoutput.rs | 31 +++++---- src/uu/dd/src/dd.rs | 115 +++++++++++++++++++++++++++----- tests/by-util/test_dd.rs | 78 +++++++++++++++++++++- 3 files changed, 194 insertions(+), 30 deletions(-) diff --git a/src/uu/dd/src/bufferedoutput.rs b/src/uu/dd/src/bufferedoutput.rs index 1735ae10d..6ac3b4300 100644 --- a/src/uu/dd/src/bufferedoutput.rs +++ b/src/uu/dd/src/bufferedoutput.rs @@ -46,10 +46,8 @@ impl<'a> BufferedOutput<'a> { /// Flush the partial block stored in the internal buffer. pub(crate) fn flush(&mut self) -> std::io::Result { let wstat = self.inner.write_blocks(&self.buf)?; - let n = wstat.bytes_total; - for _ in 0..n { - self.buf.remove(0); - } + let n = wstat.bytes_total.try_into().unwrap(); + self.buf.drain(0..n); Ok(wstat) } @@ -70,8 +68,19 @@ impl<'a> BufferedOutput<'a> { /// block. The returned [`WriteStat`] object will include the /// number of blocks written during execution of this function. pub(crate) fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result { - // Concatenate the old partial block with the new incoming bytes. - let towrite = [&self.buf, buf].concat(); + // Split the incoming buffer into two parts: the bytes to write + // and the bytes to buffer for next time. + // + // If `buf` does not include enough bytes to form a full block, + // just buffer the whole thing and write zero blocks. + let n = self.buf.len() + buf.len(); + let rem = n % self.inner.settings.obs; + let i = buf.len().saturating_sub(rem); + let (to_write, to_buffer) = buf.split_at(i); + + // Concatenate the old partial block with the new bytes to form + // some number of complete blocks. + self.buf.extend_from_slice(to_write); // Write all complete blocks to the inner block writer. // @@ -79,19 +88,15 @@ impl<'a> BufferedOutput<'a> { // partial block were `b"ab"` and the new incoming bytes were // `b"cdefg"`, then we would write blocks `b"abc"` and // b`"def"` to the inner block writer. - let n = towrite.len(); - let rem = n % self.inner.settings.obs; - let wstat = self.inner.write_blocks(&towrite[..n - rem])?; - self.buf.clear(); + let wstat = self.inner.write_blocks(&self.buf)?; // Buffer any remaining bytes as a partial block. // // Continuing the example above, the last byte `b"g"` would be // buffered as a partial block until the next call to // `write_blocks()`. - for byte in &towrite[n - rem..] { - self.buf.push(*byte); - } + self.buf.clear(); + self.buf.extend_from_slice(to_buffer); Ok(wstat) } diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index b760d98e0..645c24967 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -13,6 +13,7 @@ mod numbers; mod parseargs; mod progress; +use crate::bufferedoutput::BufferedOutput; use blocks::conv_block_unblock_helper; use datastructures::*; use parseargs::Parser; @@ -801,6 +802,68 @@ impl<'a> Output<'a> { Ok(()) } } + + /// Truncate the underlying file to the current stream position, if possible. + fn truncate(&mut self) -> std::io::Result<()> { + self.dst.truncate() + } +} + +/// The block writer either with or without partial block buffering. +enum BlockWriter<'a> { + /// Block writer with partial block buffering. + /// + /// Partial blocks are buffered until completed. + Buffered(BufferedOutput<'a>), + + /// Block writer without partial block buffering. + /// + /// Partial blocks are written immediately. + Unbuffered(Output<'a>), +} + +impl<'a> BlockWriter<'a> { + fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) { + match self { + Self::Unbuffered(o) => o.discard_cache(offset, len), + Self::Buffered(o) => o.discard_cache(offset, len), + } + } + + fn flush(&mut self) -> io::Result { + match self { + Self::Unbuffered(_) => Ok(WriteStat::default()), + Self::Buffered(o) => o.flush(), + } + } + + fn sync(&mut self) -> io::Result<()> { + match self { + Self::Unbuffered(o) => o.sync(), + Self::Buffered(o) => o.sync(), + } + } + + /// Truncate the file to the final cursor location. + fn truncate(&mut self) { + // Calling `set_len()` may result in an error (for example, + // when calling it on `/dev/null`), but we don't want to + // terminate the process when that happens. Instead, we + // suppress the error by calling `Result::ok()`. This matches + // the behavior of GNU `dd` when given the command-line + // argument `of=/dev/null`. + match self { + Self::Unbuffered(o) => o.truncate().ok(), + Self::Buffered(o) => o.truncate().ok(), + }; + } + + fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result { + match self { + Self::Unbuffered(o) => o.write_blocks(buf), + Self::Buffered(o) => o.write_blocks(buf), + } + } } /// Copy the given input data to this output, consuming both. @@ -814,7 +877,7 @@ impl<'a> Output<'a> { /// /// If there is a problem reading from the input or writing to /// this output. -fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { +fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> { // The read and write statistics. // // These objects are counters, initialized to zero. After each @@ -851,6 +914,9 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { let (prog_tx, rx) = mpsc::channel(); let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status)); + // Whether to truncate the output file after all blocks have been written. + let truncate = !o.settings.oconv.notrunc; + // Optimization: if no blocks are to be written, then don't // bother allocating any buffers. if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count { @@ -875,7 +941,15 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { let len = o.dst.len()?.try_into().unwrap(); o.discard_cache(offset, len); } - return finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread); + return finalize( + BlockWriter::Unbuffered(o), + rstat, + wstat, + start, + &prog_tx, + output_thread, + truncate, + ); }; // Create a common buffer with a capacity of the block size. @@ -895,6 +969,16 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { let mut read_offset = 0; let mut write_offset = 0; + let input_nocache = i.settings.iflags.nocache; + let output_nocache = o.settings.oflags.nocache; + + // Add partial block buffering, if needed. + let mut o = if o.settings.buffered { + BlockWriter::Buffered(BufferedOutput::new(o)) + } else { + BlockWriter::Unbuffered(o) + }; + // The main read/write loop. // // Each iteration reads blocks from the input and writes @@ -919,7 +1003,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // // TODO Better error handling for overflowing `offset` and `len`. let read_len = rstat_update.bytes_total; - if i.settings.iflags.nocache { + if input_nocache { let offset = read_offset.try_into().unwrap(); let len = read_len.try_into().unwrap(); i.discard_cache(offset, len); @@ -931,7 +1015,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // // TODO Better error handling for overflowing `offset` and `len`. let write_len = wstat_update.bytes_total; - if o.settings.oflags.nocache { + if output_nocache { let offset = write_offset.try_into().unwrap(); let len = write_len.try_into().unwrap(); o.discard_cache(offset, len); @@ -951,34 +1035,33 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { prog_tx.send(prog_update).unwrap_or(()); } } - finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread) + finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate) } /// Flush output, print final stats, and join with the progress thread. fn finalize( - output: &mut Output, + mut output: BlockWriter, rstat: ReadStat, wstat: WriteStat, start: Instant, prog_tx: &mpsc::Sender, output_thread: thread::JoinHandle, + truncate: bool, ) -> std::io::Result<()> { - // Flush the output, if configured to do so. + // Flush the output in case a partial write has been buffered but + // not yet written. + let wstat_update = output.flush()?; + + // Sync the output, if configured to do so. output.sync()?; // Truncate the file to the final cursor location. - // - // Calling `set_len()` may result in an error (for example, - // when calling it on `/dev/null`), but we don't want to - // terminate the process when that happens. Instead, we - // suppress the error by calling `Result::ok()`. This matches - // the behavior of GNU `dd` when given the command-line - // argument `of=/dev/null`. - if !output.settings.oconv.notrunc { - output.dst.truncate().ok(); + if truncate { + output.truncate(); } // Print the final read/write statistics. + let wstat = wstat + wstat_update; let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true); prog_tx.send(prog_update).unwrap_or(()); // Wait for the output thread to finish diff --git a/tests/by-util/test_dd.rs b/tests/by-util/test_dd.rs index d5ac8dc80..a4c70097c 100644 --- a/tests/by-util/test_dd.rs +++ b/tests/by-util/test_dd.rs @@ -2,7 +2,7 @@ // // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg +// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg fifoname #[cfg(unix)] use crate::common::util::run_ucmd_as_root_with_stdin_stdout; @@ -15,6 +15,8 @@ use regex::Regex; use std::fs::{File, OpenOptions}; use std::io::{BufReader, Read, Write}; use std::path::PathBuf; +#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))] +use std::process::{Command, Stdio}; #[cfg(not(windows))] use std::thread::sleep; #[cfg(not(windows))] @@ -1582,3 +1584,77 @@ fn test_seek_past_dev() { print!("TEST SKIPPED"); } } + +#[test] +#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))] +fn test_reading_partial_blocks_from_fifo() { + // Create the FIFO. + let ts = TestScenario::new(util_name!()); + let at = ts.fixtures.clone(); + at.mkfifo("fifo"); + let fifoname = at.plus_as_string("fifo"); + + // Start a `dd` process that reads from the fifo (so it will wait + // until the writer process starts). + let mut reader_command = Command::new(TESTS_BINARY); + let child = reader_command + .args(["dd", "ibs=3", "obs=3", &format!("if={}", fifoname)]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + + // Start different processes to write to the FIFO, with a small + // pause in between. + let mut writer_command = Command::new("sh"); + writer_command + .args([ + "-c", + &format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname), + ]) + .spawn() + .unwrap(); + + let output = child.wait_with_output().unwrap(); + assert_eq!(output.stdout, b"abcd"); + let expected = b"0+2 records in\n1+1 records out\n4 bytes copied"; + assert!(output.stderr.starts_with(expected)); +} + +#[test] +#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))] +fn test_reading_partial_blocks_from_fifo_unbuffered() { + // Create the FIFO. + let ts = TestScenario::new(util_name!()); + let at = ts.fixtures.clone(); + at.mkfifo("fifo"); + let fifoname = at.plus_as_string("fifo"); + + // Start a `dd` process that reads from the fifo (so it will wait + // until the writer process starts). + // + // `bs=N` takes precedence over `ibs=N` and `obs=N`. + let mut reader_command = Command::new(TESTS_BINARY); + let child = reader_command + .args(["dd", "bs=3", "ibs=1", "obs=1", &format!("if={}", fifoname)]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + + // Start different processes to write to the FIFO, with a small + // pause in between. + let mut writer_command = Command::new("sh"); + writer_command + .args([ + "-c", + &format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname), + ]) + .spawn() + .unwrap(); + + let output = child.wait_with_output().unwrap(); + assert_eq!(output.stdout, b"abcd"); + let expected = b"0+2 records in\n0+2 records out\n4 bytes copied"; + assert!(output.stderr.starts_with(expected)); +}