diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index e47f923ee..ef62943ee 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -346,15 +346,6 @@ where }) } - /// Print the read/write statistics. - fn print_stats(&self, i: &Input, prog_update: &ProgUpdate) { - match i.print_level { - Some(StatusLevel::None) => {} - Some(StatusLevel::Noxfer) => prog_update.print_io_lines(), - Some(StatusLevel::Progress) | None => prog_update.print_transfer_stats(), - } - } - /// Flush the output to disk, if configured to do so. fn sync(&mut self) -> std::io::Result<()> { if self.cflags.fsync { @@ -404,15 +395,17 @@ where // Start a thread that reports transfer progress. // - // When `status=progress` is given on the command-line, the - // `dd` program reports its progress every second or so. We + // The `dd` program reports its progress after every block is written, + // at most every 1 second, and only if `status=progress` is given on + // the command-line or a SIGUSR1 signal is received. We // perform this reporting in a new thread so as not to take // any CPU time away from the actual reading and writing of // data. We send a `ProgUpdate` from the transmitter `prog_tx` // to the receives `rx`, and the receiver prints the transfer // information. let (prog_tx, rx) = mpsc::channel(); - thread::spawn(gen_prog_updater(rx, i.print_level)); + let output_thread = thread::spawn(gen_prog_updater(rx, i.print_level)); + let mut progress_as_secs = 0; // Create a common buffer with a capacity of the block size. // This is the max size needed. @@ -437,7 +430,7 @@ where } let wstat_update = self.write_blocks(&buf)?; - // Update the read/write stats and inform the progress thread. + // Update the read/write stats and inform the progress thread once per second. // // If the receiver is disconnected, `send()` returns an // error. Since it is just reporting progress and is not @@ -445,16 +438,23 @@ where // error. rstat += rstat_update; wstat += wstat_update; - let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed()); - prog_tx.send(prog_update).unwrap_or(()); + let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); + if prog_update.duration.as_secs() >= progress_as_secs { + progress_as_secs = prog_update.duration.as_secs() + 1; + prog_tx.send(prog_update).unwrap_or(()); + } } // Flush the output, if configured to do so. self.sync()?; // Print the final read/write statistics. - let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed()); - self.print_stats(&i, &prog_update); + let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true); + prog_tx.send(prog_update).unwrap_or(()); + // Wait for the output thread to finish + output_thread + .join() + .expect("Failed to join with the output thread."); Ok(()) } } diff --git a/src/uu/dd/src/progress.rs b/src/uu/dd/src/progress.rs index 315fb862a..312d8f096 100644 --- a/src/uu/dd/src/progress.rs +++ b/src/uu/dd/src/progress.rs @@ -44,15 +44,26 @@ pub(crate) struct ProgUpdate { /// The time period over which the reads and writes were measured. pub(crate) duration: Duration, + + /// The status of the write. + /// + /// True if the write is completed, false if still in-progress. + pub(crate) complete: bool, } impl ProgUpdate { /// Instantiate this struct. - pub(crate) fn new(read_stat: ReadStat, write_stat: WriteStat, duration: Duration) -> Self { + pub(crate) fn new( + read_stat: ReadStat, + write_stat: WriteStat, + duration: Duration, + complete: bool, + ) -> Self { Self { read_stat, write_stat, duration, + complete, } } @@ -175,7 +186,8 @@ impl ProgUpdate { /// This is a convenience method that calls /// [`ProgUpdate::write_io_lines`] and /// [`ProgUpdate::write_prog_line`] in that order. The information - /// is written to `w`. + /// is written to `w`. It optionally begins by writing a new line, + /// intended to handle the case of an existing progress line. /// /// # Examples /// @@ -190,7 +202,7 @@ impl ProgUpdate { /// duration: Duration::new(1, 0), // one second /// }; /// let mut cursor = Cursor::new(vec![]); - /// prog_update.write_transfer_stats(&mut cursor).unwrap(); + /// prog_update.write_transfer_stats(&mut cursor, false).unwrap(); /// let mut iter = cursor.get_ref().split(|v| *v == b'\n'); /// assert_eq!(iter.next().unwrap(), b"0+0 records in"); /// assert_eq!(iter.next().unwrap(), b"0+0 records out"); @@ -198,7 +210,10 @@ impl ProgUpdate { /// assert_eq!(iter.next().unwrap(), b""); /// assert!(iter.next().is_none()); /// ``` - fn write_transfer_stats(&self, w: &mut impl Write) -> std::io::Result<()> { + fn write_transfer_stats(&self, w: &mut impl Write, new_line: bool) -> std::io::Result<()> { + if new_line { + writeln!(w)?; + } self.write_io_lines(w)?; let rewrite = false; self.write_prog_line(w, rewrite)?; @@ -225,9 +240,22 @@ impl ProgUpdate { /// Write all summary statistics. /// /// See [`ProgUpdate::write_transfer_stats`] for more information. - pub(crate) fn print_transfer_stats(&self) { + pub(crate) fn print_transfer_stats(&self, new_line: bool) { let mut stderr = std::io::stderr(); - self.write_transfer_stats(&mut stderr).unwrap(); + self.write_transfer_stats(&mut stderr, new_line).unwrap(); + } + + /// Write all the final statistics. + pub(crate) fn print_final_stats( + &self, + print_level: Option, + progress_printed: bool, + ) { + match print_level { + Some(StatusLevel::None) => {} + Some(StatusLevel::Noxfer) => self.print_io_lines(), + Some(StatusLevel::Progress) | None => self.print_transfer_stats(progress_printed), + } } } @@ -380,9 +408,16 @@ pub(crate) fn gen_prog_updater( print_level: Option, ) -> impl Fn() { move || { + let mut progress_printed = false; while let Ok(update) = rx.recv() { + // Print the final read/write statistics. + if update.complete { + update.print_final_stats(print_level, progress_printed); + return; + } if Some(StatusLevel::Progress) == print_level { update.reprint_prog_line(); + progress_printed = true; } } } @@ -427,19 +462,29 @@ pub(crate) fn gen_prog_updater( } }); - let mut progress_as_secs = 0; + // Holds the state of whether we have printed the current progress. + // This is needed so that we know whether or not to print a newline + // character before outputting non-progress data. + let mut progress_printed = false; while let Ok(update) = rx.recv() { - // (Re)print status line if progress is requested. - if Some(StatusLevel::Progress) == print_level - && update.duration.as_secs() >= progress_as_secs - { - update.reprint_prog_line(); - progress_as_secs = update.duration.as_secs() + 1; + // Print the final read/write statistics. + if update.complete { + update.print_final_stats(print_level, progress_printed); + return; + } + // (Re)print status line if progress is requested. + if Some(StatusLevel::Progress) == print_level && !update.complete { + update.reprint_prog_line(); + progress_printed = true; + } + // Handle signals and set the signal to un-seen. + // This will print a maximum of 1 time per second, even though it + // should be printing on every SIGUSR1. + if let SIGUSR1_USIZE = sigval.swap(0, Ordering::Relaxed) { + update.print_transfer_stats(progress_printed); + // Reset the progress printed, since print_transfer_stats always prints a newline. + progress_printed = false; } - // Handle signals - if let SIGUSR1_USIZE = sigval.load(Ordering::Relaxed) { - update.print_transfer_stats(); - }; } } } @@ -460,6 +505,7 @@ mod tests { ..Default::default() }, duration: Duration::new(1, 0), // one second + complete: false, } } @@ -484,10 +530,12 @@ mod tests { let read_stat = ReadStat::new(1, 2, 3); let write_stat = WriteStat::new(4, 5, 6); let duration = Duration::new(789, 0); + let complete = false; let prog_update = ProgUpdate { read_stat, write_stat, duration, + complete, }; let mut cursor = Cursor::new(vec![]); @@ -500,7 +548,13 @@ mod tests { #[test] fn test_prog_update_write_prog_line() { - let prog_update = prog_update_write(0); + let prog_update = ProgUpdate { + read_stat: Default::default(), + write_stat: Default::default(), + duration: Duration::new(1, 0), // one second + complete: false, + }; + let mut cursor = Cursor::new(vec![]); let rewrite = false; prog_update.write_prog_line(&mut cursor, rewrite).unwrap(); @@ -551,9 +605,12 @@ mod tests { read_stat: Default::default(), write_stat: Default::default(), duration: Duration::new(1, 0), // one second + complete: false, }; let mut cursor = Cursor::new(vec![]); - prog_update.write_transfer_stats(&mut cursor).unwrap(); + prog_update + .write_transfer_stats(&mut cursor, false) + .unwrap(); let mut iter = cursor.get_ref().split(|v| *v == b'\n'); assert_eq!(iter.next().unwrap(), b"0+0 records in"); assert_eq!(iter.next().unwrap(), b"0+0 records out"); @@ -561,4 +618,26 @@ mod tests { assert_eq!(iter.next().unwrap(), b""); assert!(iter.next().is_none()); } + + #[test] + fn write_final_transfer_stats() { + // Tests the formatting of the final statistics written after a progress line. + let prog_update = ProgUpdate { + read_stat: Default::default(), + write_stat: Default::default(), + duration: Duration::new(1, 0), // one second + complete: false, + }; + let mut cursor = Cursor::new(vec![]); + let rewrite = true; + prog_update.write_prog_line(&mut cursor, rewrite).unwrap(); + prog_update.write_transfer_stats(&mut cursor, true).unwrap(); + let mut iter = cursor.get_ref().split(|v| *v == b'\n'); + assert_eq!(iter.next().unwrap(), b"\r0 bytes copied, 1.0 s, 0 B/s"); + assert_eq!(iter.next().unwrap(), b"0+0 records in"); + assert_eq!(iter.next().unwrap(), b"0+0 records out"); + assert_eq!(iter.next().unwrap(), b"0 bytes copied, 1.0 s, 0 B/s"); + assert_eq!(iter.next().unwrap(), b""); + assert!(iter.next().is_none()); + } }