diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 6727f4bb0..4e1287939 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -36,9 +36,12 @@ use std::os::unix::{ io::{AsRawFd, FromRawFd}, }; use std::path::Path; -use std::sync::mpsc; +use std::sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + mpsc, Arc, +}; use std::thread; -use std::time; +use std::time::{Duration, Instant}; use clap::{crate_version, Arg, Command}; use gcd::Gcd; @@ -75,6 +78,45 @@ struct Settings { status: Option, } +/// A timer which triggers on a given interval +/// +/// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`] +/// will return true once per the given [`Duration`]. +/// +/// Can be cloned, but the trigger status is shared across all instances so only +/// the first caller each interval will yield true. +/// +/// When all instances are dropped the background thread will exit on the next interval. +#[derive(Debug, Clone)] +pub struct Alarm { + interval: Duration, + trigger: Arc, +} + +impl Alarm { + pub fn with_interval(interval: Duration) -> Self { + let trigger = Arc::new(AtomicBool::default()); + + let weak_trigger = Arc::downgrade(&trigger); + thread::spawn(move || { + while let Some(trigger) = weak_trigger.upgrade() { + thread::sleep(interval); + trigger.store(true, Relaxed); + } + }); + + Self { interval, trigger } + } + + pub fn is_triggered(&self) -> bool { + self.trigger.swap(false, Relaxed) + } + + pub fn get_interval(&self) -> Duration { + self.interval + } +} + /// A number in blocks or bytes /// /// Some values (seek, skip, iseek, oseek) can have values either in blocks or in bytes. @@ -760,7 +802,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // of its report includes the throughput in bytes per second, // which requires knowing how long the process has been // running. - let start = time::Instant::now(); + let start = Instant::now(); // A good buffer size for reading. // @@ -780,7 +822,6 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // information. let (prog_tx, rx) = mpsc::channel(); let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status)); - let mut progress_as_secs = 0; // Optimization: if no blocks are to be written, then don't // bother allocating any buffers. @@ -813,6 +854,12 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // This is the max size needed. let mut buf = vec![BUF_INIT_BYTE; bsize]; + // Spawn a timer thread to provide a scheduled signal indicating when we + // should send an update of our progress to the reporting thread. + // + // This avoids the need to query the OS monotonic clock for every block. + let alarm = Alarm::with_interval(Duration::from_secs(1)); + // Index in the input file where we are reading bytes and in // the output file where we are writing bytes. // @@ -871,9 +918,8 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // error. rstat += rstat_update; wstat += wstat_update; - let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); - if prog_update.duration.as_secs() >= progress_as_secs { - progress_as_secs = prog_update.duration.as_secs() + 1; + if alarm.is_triggered() { + let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); prog_tx.send(prog_update).unwrap_or(()); } } @@ -885,7 +931,7 @@ fn finalize( output: &mut Output, rstat: ReadStat, wstat: WriteStat, - start: time::Instant, + start: Instant, prog_tx: &mpsc::Sender, output_thread: thread::JoinHandle, ) -> std::io::Result<()> {