1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-29 12:07:46 +00:00

dd: fix output issues (#3610)

This commit is contained in:
Patrick Jackson 2022-06-23 23:58:10 -07:00 committed by GitHub
parent b6bb476aa0
commit 78a77c4211
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 36 deletions

View file

@ -346,15 +346,6 @@ where
})
}
/// Print the read/write statistics.
fn print_stats<R: Read>(&self, i: &Input<R>, prog_update: &ProgUpdate) {
match i.print_level {
Some(StatusLevel::None) => {}
Some(StatusLevel::Noxfer) => prog_update.print_io_lines(),
Some(StatusLevel::Progress) | None => prog_update.print_transfer_stats(),
}
}
/// Flush the output to disk, if configured to do so.
fn sync(&mut self) -> std::io::Result<()> {
if self.cflags.fsync {
@ -404,15 +395,17 @@ where
// Start a thread that reports transfer progress.
//
// When `status=progress` is given on the command-line, the
// `dd` program reports its progress every second or so. We
// The `dd` program reports its progress after every block is written,
// at most every 1 second, and only if `status=progress` is given on
// the command-line or a SIGUSR1 signal is received. We
// perform this reporting in a new thread so as not to take
// any CPU time away from the actual reading and writing of
// data. We send a `ProgUpdate` from the transmitter `prog_tx`
// to the receives `rx`, and the receiver prints the transfer
// information.
let (prog_tx, rx) = mpsc::channel();
thread::spawn(gen_prog_updater(rx, i.print_level));
let output_thread = thread::spawn(gen_prog_updater(rx, i.print_level));
let mut progress_as_secs = 0;
// Create a common buffer with a capacity of the block size.
// This is the max size needed.
@ -437,7 +430,7 @@ where
}
let wstat_update = self.write_blocks(&buf)?;
// Update the read/write stats and inform the progress thread.
// Update the read/write stats and inform the progress thread once per second.
//
// If the receiver is disconnected, `send()` returns an
// error. Since it is just reporting progress and is not
@ -445,16 +438,23 @@ where
// error.
rstat += rstat_update;
wstat += wstat_update;
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed());
prog_tx.send(prog_update).unwrap_or(());
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
if prog_update.duration.as_secs() >= progress_as_secs {
progress_as_secs = prog_update.duration.as_secs() + 1;
prog_tx.send(prog_update).unwrap_or(());
}
}
// Flush the output, if configured to do so.
self.sync()?;
// Print the final read/write statistics.
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed());
self.print_stats(&i, &prog_update);
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
prog_tx.send(prog_update).unwrap_or(());
// Wait for the output thread to finish
output_thread
.join()
.expect("Failed to join with the output thread.");
Ok(())
}
}

View file

@ -44,15 +44,26 @@ pub(crate) struct ProgUpdate {
/// The time period over which the reads and writes were measured.
pub(crate) duration: Duration,
/// The status of the write.
///
/// True if the write is completed, false if still in-progress.
pub(crate) complete: bool,
}
impl ProgUpdate {
/// Instantiate this struct.
pub(crate) fn new(read_stat: ReadStat, write_stat: WriteStat, duration: Duration) -> Self {
pub(crate) fn new(
read_stat: ReadStat,
write_stat: WriteStat,
duration: Duration,
complete: bool,
) -> Self {
Self {
read_stat,
write_stat,
duration,
complete,
}
}
@ -175,7 +186,8 @@ impl ProgUpdate {
/// This is a convenience method that calls
/// [`ProgUpdate::write_io_lines`] and
/// [`ProgUpdate::write_prog_line`] in that order. The information
/// is written to `w`.
/// is written to `w`. It optionally begins by writing a new line,
/// intended to handle the case of an existing progress line.
///
/// # Examples
///
@ -190,7 +202,7 @@ impl ProgUpdate {
/// duration: Duration::new(1, 0), // one second
/// };
/// let mut cursor = Cursor::new(vec![]);
/// prog_update.write_transfer_stats(&mut cursor).unwrap();
/// prog_update.write_transfer_stats(&mut cursor, false).unwrap();
/// let mut iter = cursor.get_ref().split(|v| *v == b'\n');
/// assert_eq!(iter.next().unwrap(), b"0+0 records in");
/// assert_eq!(iter.next().unwrap(), b"0+0 records out");
@ -198,7 +210,10 @@ impl ProgUpdate {
/// assert_eq!(iter.next().unwrap(), b"");
/// assert!(iter.next().is_none());
/// ```
fn write_transfer_stats(&self, w: &mut impl Write) -> std::io::Result<()> {
fn write_transfer_stats(&self, w: &mut impl Write, new_line: bool) -> std::io::Result<()> {
if new_line {
writeln!(w)?;
}
self.write_io_lines(w)?;
let rewrite = false;
self.write_prog_line(w, rewrite)?;
@ -225,9 +240,22 @@ impl ProgUpdate {
/// Write all summary statistics.
///
/// See [`ProgUpdate::write_transfer_stats`] for more information.
pub(crate) fn print_transfer_stats(&self) {
pub(crate) fn print_transfer_stats(&self, new_line: bool) {
let mut stderr = std::io::stderr();
self.write_transfer_stats(&mut stderr).unwrap();
self.write_transfer_stats(&mut stderr, new_line).unwrap();
}
/// Write all the final statistics.
pub(crate) fn print_final_stats(
&self,
print_level: Option<StatusLevel>,
progress_printed: bool,
) {
match print_level {
Some(StatusLevel::None) => {}
Some(StatusLevel::Noxfer) => self.print_io_lines(),
Some(StatusLevel::Progress) | None => self.print_transfer_stats(progress_printed),
}
}
}
@ -380,9 +408,16 @@ pub(crate) fn gen_prog_updater(
print_level: Option<StatusLevel>,
) -> impl Fn() {
move || {
let mut progress_printed = false;
while let Ok(update) = rx.recv() {
// Print the final read/write statistics.
if update.complete {
update.print_final_stats(print_level, progress_printed);
return;
}
if Some(StatusLevel::Progress) == print_level {
update.reprint_prog_line();
progress_printed = true;
}
}
}
@ -427,19 +462,29 @@ pub(crate) fn gen_prog_updater(
}
});
let mut progress_as_secs = 0;
// Holds the state of whether we have printed the current progress.
// This is needed so that we know whether or not to print a newline
// character before outputting non-progress data.
let mut progress_printed = false;
while let Ok(update) = rx.recv() {
// (Re)print status line if progress is requested.
if Some(StatusLevel::Progress) == print_level
&& update.duration.as_secs() >= progress_as_secs
{
update.reprint_prog_line();
progress_as_secs = update.duration.as_secs() + 1;
// Print the final read/write statistics.
if update.complete {
update.print_final_stats(print_level, progress_printed);
return;
}
// (Re)print status line if progress is requested.
if Some(StatusLevel::Progress) == print_level && !update.complete {
update.reprint_prog_line();
progress_printed = true;
}
// Handle signals and set the signal to un-seen.
// This will print a maximum of 1 time per second, even though it
// should be printing on every SIGUSR1.
if let SIGUSR1_USIZE = sigval.swap(0, Ordering::Relaxed) {
update.print_transfer_stats(progress_printed);
// Reset the progress printed, since print_transfer_stats always prints a newline.
progress_printed = false;
}
// Handle signals
if let SIGUSR1_USIZE = sigval.load(Ordering::Relaxed) {
update.print_transfer_stats();
};
}
}
}
@ -460,6 +505,7 @@ mod tests {
..Default::default()
},
duration: Duration::new(1, 0), // one second
complete: false,
}
}
@ -484,10 +530,12 @@ mod tests {
let read_stat = ReadStat::new(1, 2, 3);
let write_stat = WriteStat::new(4, 5, 6);
let duration = Duration::new(789, 0);
let complete = false;
let prog_update = ProgUpdate {
read_stat,
write_stat,
duration,
complete,
};
let mut cursor = Cursor::new(vec![]);
@ -500,7 +548,13 @@ mod tests {
#[test]
fn test_prog_update_write_prog_line() {
let prog_update = prog_update_write(0);
let prog_update = ProgUpdate {
read_stat: Default::default(),
write_stat: Default::default(),
duration: Duration::new(1, 0), // one second
complete: false,
};
let mut cursor = Cursor::new(vec![]);
let rewrite = false;
prog_update.write_prog_line(&mut cursor, rewrite).unwrap();
@ -551,9 +605,12 @@ mod tests {
read_stat: Default::default(),
write_stat: Default::default(),
duration: Duration::new(1, 0), // one second
complete: false,
};
let mut cursor = Cursor::new(vec![]);
prog_update.write_transfer_stats(&mut cursor).unwrap();
prog_update
.write_transfer_stats(&mut cursor, false)
.unwrap();
let mut iter = cursor.get_ref().split(|v| *v == b'\n');
assert_eq!(iter.next().unwrap(), b"0+0 records in");
assert_eq!(iter.next().unwrap(), b"0+0 records out");
@ -561,4 +618,26 @@ mod tests {
assert_eq!(iter.next().unwrap(), b"");
assert!(iter.next().is_none());
}
#[test]
fn write_final_transfer_stats() {
// Tests the formatting of the final statistics written after a progress line.
let prog_update = ProgUpdate {
read_stat: Default::default(),
write_stat: Default::default(),
duration: Duration::new(1, 0), // one second
complete: false,
};
let mut cursor = Cursor::new(vec![]);
let rewrite = true;
prog_update.write_prog_line(&mut cursor, rewrite).unwrap();
prog_update.write_transfer_stats(&mut cursor, true).unwrap();
let mut iter = cursor.get_ref().split(|v| *v == b'\n');
assert_eq!(iter.next().unwrap(), b"\r0 bytes copied, 1.0 s, 0 B/s");
assert_eq!(iter.next().unwrap(), b"0+0 records in");
assert_eq!(iter.next().unwrap(), b"0+0 records out");
assert_eq!(iter.next().unwrap(), b"0 bytes copied, 1.0 s, 0 B/s");
assert_eq!(iter.next().unwrap(), b"");
assert!(iter.next().is_none());
}
}