1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-28 11:37:44 +00:00

Merge pull request #3095 from jfinkels/dd-concise-main-loop

dd: make main loop more concise
This commit is contained in:
Sylvestre Ledru 2022-02-08 20:44:39 +01:00 committed by GitHub
commit 12d5139320
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 133 additions and 82 deletions

View file

@ -19,12 +19,34 @@ pub struct ProgUpdate {
pub duration: time::Duration, pub duration: time::Duration,
} }
impl ProgUpdate {
pub(crate) fn new(
read_stat: ReadStat,
write_stat: WriteStat,
duration: time::Duration,
) -> Self {
Self {
read_stat,
write_stat,
duration,
}
}
}
#[derive(Clone, Copy, Default)] #[derive(Clone, Copy, Default)]
pub struct ReadStat { pub struct ReadStat {
pub reads_complete: u64, pub reads_complete: u64,
pub reads_partial: u64, pub reads_partial: u64,
pub records_truncated: u32, pub records_truncated: u32,
} }
impl ReadStat {
/// Whether this counter has zero complete reads and zero partial reads.
pub(crate) fn is_empty(&self) -> bool {
self.reads_complete == 0 && self.reads_partial == 0
}
}
impl std::ops::AddAssign for ReadStat { impl std::ops::AddAssign for ReadStat {
fn add_assign(&mut self, other: Self) { fn add_assign(&mut self, other: Self) {
*self = Self { *self = Self {
@ -35,7 +57,7 @@ impl std::ops::AddAssign for ReadStat {
} }
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy, Default)]
pub struct WriteStat { pub struct WriteStat {
pub writes_complete: u64, pub writes_complete: u64,
pub writes_partial: u64, pub writes_partial: u64,

View file

@ -347,78 +347,111 @@ where
}) })
} }
fn dd_out<R: Read>(mut self, mut i: Input<R>) -> UResult<()> { /// Print the read/write statistics.
let mut rstat = ReadStat { fn print_stats<R: Read>(&self, i: &Input<R>, prog_update: &ProgUpdate) {
reads_complete: 0,
reads_partial: 0,
records_truncated: 0,
};
let mut wstat = WriteStat {
writes_complete: 0,
writes_partial: 0,
bytes_total: 0,
};
let start = time::Instant::now();
let bsize = calc_bsize(i.ibs, self.obs);
let prog_tx = {
let (tx, rx) = mpsc::channel();
thread::spawn(gen_prog_updater(rx, i.print_level));
tx
};
while below_count_limit(&i.count, &rstat, &wstat) {
// Read/Write
let loop_bsize = calc_loop_bsize(&i.count, &rstat, &wstat, i.ibs, bsize);
match read_helper(&mut i, loop_bsize)? {
(
ReadStat {
reads_complete: 0,
reads_partial: 0,
..
},
_,
) => break,
(rstat_update, buf) => {
let wstat_update = self
.write_blocks(&buf)
.map_err_context(|| "failed to write output".to_string())?;
rstat += rstat_update;
wstat += wstat_update;
}
};
// Update Prog
prog_tx
.send(ProgUpdate {
read_stat: rstat,
write_stat: wstat,
duration: start.elapsed(),
})
.map_err(|_| USimpleError::new(1, "failed to write output"))?;
}
if self.cflags.fsync {
self.fsync()
.map_err_context(|| "failed to write output".to_string())?;
} else if self.cflags.fdatasync {
self.fdatasync()
.map_err_context(|| "failed to write output".to_string())?;
}
match i.print_level { match i.print_level {
Some(StatusLevel::None) => {} Some(StatusLevel::None) => {}
Some(StatusLevel::Noxfer) => print_io_lines(&ProgUpdate { Some(StatusLevel::Noxfer) => print_io_lines(prog_update),
read_stat: rstat, Some(StatusLevel::Progress) | None => print_transfer_stats(prog_update),
write_stat: wstat,
duration: start.elapsed(),
}),
Some(StatusLevel::Progress) | None => print_transfer_stats(&ProgUpdate {
read_stat: rstat,
write_stat: wstat,
duration: start.elapsed(),
}),
} }
}
/// Flush the output to disk, if configured to do so.
fn sync(&mut self) -> std::io::Result<()> {
if self.cflags.fsync {
self.fsync()
} else if self.cflags.fdatasync {
self.fdatasync()
} else {
// Intentionally do nothing in this case.
Ok(())
}
}
/// Copy the given input data to this output, consuming both.
///
/// This method contains the main loop for the `dd` program. Bytes
/// are read in blocks from `i` and written in blocks to this
/// output. Read/write statistics are reported to stderr as
/// configured by the `status` command-line argument.
///
/// # Errors
///
/// If there is a problem reading from the input or writing to
/// this output.
fn dd_out<R: Read>(mut self, mut i: Input<R>) -> std::io::Result<()> {
// The read and write statistics.
//
// These objects are counters, initialized to zero. After each
// iteration of the main loop, each will be incremented by the
// number of blocks read and written, respectively.
let mut rstat = Default::default();
let mut wstat = Default::default();
// The time at which the main loop starts executing.
//
// When `status=progress` is given on the command-line, the
// `dd` program reports its progress every second or so. Part
// of its report includes the throughput in bytes per second,
// which requires knowing how long the process has been
// running.
let start = time::Instant::now();
// A good buffer size for reading.
//
// This is an educated guess about a good buffer size based on
// the input and output block sizes.
let bsize = calc_bsize(i.ibs, self.obs);
// Start a thread that reports transfer progress.
//
// When `status=progress` is given on the command-line, the
// `dd` program reports its progress every second or so. We
// perform this reporting in a new thread so as not to take
// any CPU time away from the actual reading and writing of
// data. We send a `ProgUpdate` from the transmitter `prog_tx`
// to the receives `rx`, and the receiver prints the transfer
// information.
let (prog_tx, rx) = mpsc::channel();
thread::spawn(gen_prog_updater(rx, i.print_level));
// The main read/write loop.
//
// Each iteration reads blocks from the input and writes
// blocks to this output. Read/write statistics are updated on
// each iteration and cumulative statistics are reported to
// the progress reporting thread.
while below_count_limit(&i.count, &rstat, &wstat) {
// Read a block from the input then write the block to the output.
//
// As an optimization, make an educated guess about the
// best buffer size for reading based on the number of
// blocks already read and the number of blocks remaining.
let loop_bsize = calc_loop_bsize(&i.count, &rstat, &wstat, i.ibs, bsize);
let (rstat_update, buf) = read_helper(&mut i, loop_bsize)?;
if rstat_update.is_empty() {
break;
}
let wstat_update = self.write_blocks(&buf)?;
// Update the read/write stats and inform the progress thread.
//
// If the receiver is disconnected, `send()` returns an
// error. Since it is just reporting progress and is not
// crucial to the operation of `dd`, let's just ignore the
// error.
rstat += rstat_update;
wstat += wstat_update;
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed());
prog_tx.send(prog_update).unwrap_or(());
}
// Flush the output, if configured to do so.
self.sync()?;
// Print the final read/write statistics.
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed());
self.print_stats(&i, &prog_update);
Ok(()) Ok(())
} }
} }
@ -697,7 +730,7 @@ fn conv_block_unblock_helper<R: Read>(
} }
/// Read helper performs read operations common to all dd reads, and dispatches the buffer to relevant helper functions as dictated by the operations requested by the user. /// Read helper performs read operations common to all dd reads, and dispatches the buffer to relevant helper functions as dictated by the operations requested by the user.
fn read_helper<R: Read>(i: &mut Input<R>, bsize: usize) -> UResult<(ReadStat, Vec<u8>)> { fn read_helper<R: Read>(i: &mut Input<R>, bsize: usize) -> std::io::Result<(ReadStat, Vec<u8>)> {
// Local Predicate Fns ----------------------------------------------- // Local Predicate Fns -----------------------------------------------
fn is_conv<R: Read>(i: &Input<R>) -> bool { fn is_conv<R: Read>(i: &Input<R>) -> bool {
i.cflags.ctable.is_some() i.cflags.ctable.is_some()
@ -718,12 +751,8 @@ fn read_helper<R: Read>(i: &mut Input<R>, bsize: usize) -> UResult<(ReadStat, Ve
// Read // Read
let mut buf = vec![BUF_INIT_BYTE; bsize]; let mut buf = vec![BUF_INIT_BYTE; bsize];
let mut rstat = match i.cflags.sync { let mut rstat = match i.cflags.sync {
Some(ch) => i Some(ch) => i.fill_blocks(&mut buf, ch)?,
.fill_blocks(&mut buf, ch) _ => i.fill_consecutive(&mut buf)?,
.map_err_context(|| "failed to write output".to_string())?,
_ => i
.fill_consecutive(&mut buf)
.map_err_context(|| "failed to write output".to_string())?,
}; };
// Return early if no data // Return early if no data
if rstat.reads_complete == 0 && rstat.reads_partial == 0 { if rstat.reads_complete == 0 && rstat.reads_partial == 0 {
@ -735,7 +764,7 @@ fn read_helper<R: Read>(i: &mut Input<R>, bsize: usize) -> UResult<(ReadStat, Ve
perform_swab(&mut buf); perform_swab(&mut buf);
} }
if is_conv(i) || is_block(i) || is_unblock(i) { if is_conv(i) || is_block(i) || is_unblock(i) {
let buf = conv_block_unblock_helper(buf, i, &mut rstat)?; let buf = conv_block_unblock_helper(buf, i, &mut rstat).unwrap();
Ok((rstat, buf)) Ok((rstat, buf))
} else { } else {
Ok((rstat, buf)) Ok((rstat, buf))
@ -925,22 +954,22 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
(true, true) => { (true, true) => {
let i = Input::<File>::new(&matches)?; let i = Input::<File>::new(&matches)?;
let o = Output::<File>::new(&matches)?; let o = Output::<File>::new(&matches)?;
o.dd_out(i) o.dd_out(i).map_err_context(|| "IO error".to_string())
} }
(false, true) => { (false, true) => {
let i = Input::<io::Stdin>::new(&matches)?; let i = Input::<io::Stdin>::new(&matches)?;
let o = Output::<File>::new(&matches)?; let o = Output::<File>::new(&matches)?;
o.dd_out(i) o.dd_out(i).map_err_context(|| "IO error".to_string())
} }
(true, false) => { (true, false) => {
let i = Input::<File>::new(&matches)?; let i = Input::<File>::new(&matches)?;
let o = Output::<io::Stdout>::new(&matches)?; let o = Output::<io::Stdout>::new(&matches)?;
o.dd_out(i) o.dd_out(i).map_err_context(|| "IO error".to_string())
} }
(false, false) => { (false, false) => {
let i = Input::<io::Stdin>::new(&matches)?; let i = Input::<io::Stdin>::new(&matches)?;
let o = Output::<io::Stdout>::new(&matches)?; let o = Output::<io::Stdout>::new(&matches)?;
o.dd_out(i) o.dd_out(i).map_err_context(|| "IO error".to_string())
} }
} }
} }