1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-27 11:07:44 +00:00

Merge pull request #6025 from cre4ture/feature/dd_direct_progress

dd: handle SIGUSR1 directly. not just every 1sec
This commit is contained in:
Sylvestre Ledru 2024-03-30 22:36:03 +01:00 committed by GitHub
commit c99e1c6813
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 195 additions and 93 deletions

View file

@ -21,6 +21,7 @@ use nix::fcntl::FcntlArg::F_SETFL;
#[cfg(any(target_os = "linux", target_os = "android"))] #[cfg(any(target_os = "linux", target_os = "android"))]
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
use parseargs::Parser; use parseargs::Parser;
use progress::ProgUpdateType;
use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat}; use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat};
use uucore::io::OwnedFileDescriptorOrHandle; use uucore::io::OwnedFileDescriptorOrHandle;
@ -39,10 +40,8 @@ use std::os::unix::{
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::{fs::MetadataExt, io::AsHandle}; use std::os::windows::{fs::MetadataExt, io::AsHandle};
use std::path::Path; use std::path::Path;
use std::sync::{ use std::sync::atomic::AtomicU8;
atomic::{AtomicBool, Ordering::Relaxed}, use std::sync::{atomic::Ordering::Relaxed, mpsc, Arc};
mpsc, Arc,
};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -87,38 +86,65 @@ struct Settings {
/// A timer which triggers on a given interval /// A timer which triggers on a given interval
/// ///
/// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`] /// After being constructed with [`Alarm::with_interval`], [`Alarm::get_trigger`]
/// will return true once per the given [`Duration`]. /// will return [`ALARM_TRIGGER_TIMER`] once per the given [`Duration`].
/// Alarm can be manually triggered with closure returned by [`Alarm::manual_trigger_fn`].
/// [`Alarm::get_trigger`] will return [`ALARM_TRIGGER_SIGNAL`] in this case.
/// ///
/// Can be cloned, but the trigger status is shared across all instances so only /// Can be cloned, but the trigger status is shared across all instances so only
/// the first caller each interval will yield true. /// the first caller each interval will yield true.
/// ///
/// When all instances are dropped the background thread will exit on the next interval. /// When all instances are dropped the background thread will exit on the next interval.
#[derive(Debug, Clone)]
pub struct Alarm { pub struct Alarm {
interval: Duration, interval: Duration,
trigger: Arc<AtomicBool>, trigger: Arc<AtomicU8>,
} }
pub const ALARM_TRIGGER_NONE: u8 = 0;
pub const ALARM_TRIGGER_TIMER: u8 = 1;
pub const ALARM_TRIGGER_SIGNAL: u8 = 2;
impl Alarm { impl Alarm {
/// use to construct alarm timer with duration
pub fn with_interval(interval: Duration) -> Self { pub fn with_interval(interval: Duration) -> Self {
let trigger = Arc::new(AtomicBool::default()); let trigger = Arc::new(AtomicU8::default());
let weak_trigger = Arc::downgrade(&trigger); let weak_trigger = Arc::downgrade(&trigger);
thread::spawn(move || { thread::spawn(move || {
while let Some(trigger) = weak_trigger.upgrade() { while let Some(trigger) = weak_trigger.upgrade() {
thread::sleep(interval); thread::sleep(interval);
trigger.store(true, Relaxed); trigger.store(ALARM_TRIGGER_TIMER, Relaxed);
} }
}); });
Self { interval, trigger } Self { interval, trigger }
} }
pub fn is_triggered(&self) -> bool { /// Returns a closure that allows to manually trigger the alarm
self.trigger.swap(false, Relaxed) ///
/// This is useful for cases where more than one alarm even source exists
/// In case of `dd` there is the SIGUSR1/SIGINFO case where we want to
/// trigger an manual progress report.
pub fn manual_trigger_fn(&self) -> Box<dyn Send + Sync + Fn()> {
let weak_trigger = Arc::downgrade(&self.trigger);
Box::new(move || {
if let Some(trigger) = weak_trigger.upgrade() {
trigger.store(ALARM_TRIGGER_SIGNAL, Relaxed);
}
})
} }
/// Use this function to poll for any pending alarm event
///
/// Returns `ALARM_TRIGGER_NONE` for no pending event.
/// Returns `ALARM_TRIGGER_TIMER` if the event was triggered by timer
/// Returns `ALARM_TRIGGER_SIGNAL` if the event was triggered manually
/// by the closure returned from `manual_trigger_fn`
pub fn get_trigger(&self) -> u8 {
self.trigger.swap(ALARM_TRIGGER_NONE, Relaxed)
}
// Getter function for the configured interval duration
pub fn get_interval(&self) -> Duration { pub fn get_interval(&self) -> Duration {
self.interval self.interval
} }
@ -818,6 +844,30 @@ impl<'a> Output<'a> {
} }
} }
/// writes a block of data. optionally retries when first try didn't complete
///
/// this is needed by gnu-test: tests/dd/stats.s
/// the write can be interrupted by a system signal.
/// e.g. SIGUSR1 which is send to report status
/// without retry, the data might not be fully written to destination.
fn write_block(&mut self, chunk: &[u8]) -> io::Result<usize> {
let full_len = chunk.len();
let mut base_idx = 0;
loop {
match self.dst.write(&chunk[base_idx..]) {
Ok(wlen) => {
base_idx += wlen;
// take iflags.fullblock as oflags shall not have this option
if (base_idx >= full_len) || !self.settings.iflags.fullblock {
return Ok(base_idx);
}
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
}
/// Write the given bytes one block at a time. /// Write the given bytes one block at a time.
/// ///
/// This may write partial blocks (for example, if the underlying /// This may write partial blocks (for example, if the underlying
@ -831,7 +881,7 @@ impl<'a> Output<'a> {
let mut bytes_total = 0; let mut bytes_total = 0;
for chunk in buf.chunks(self.settings.obs) { for chunk in buf.chunks(self.settings.obs) {
let wlen = self.dst.write(chunk)?; let wlen = self.write_block(chunk)?;
if wlen < self.settings.obs { if wlen < self.settings.obs {
writes_partial += 1; writes_partial += 1;
} else { } else {
@ -922,6 +972,29 @@ impl<'a> BlockWriter<'a> {
} }
} }
/// depending on the command line arguments, this function
/// informs the OS to flush/discard the caches for input and/or output file.
fn flush_caches_full_length(i: &Input, o: &Output) -> std::io::Result<()> {
// TODO Better error handling for overflowing `len`.
if i.settings.iflags.nocache {
let offset = 0;
#[allow(clippy::useless_conversion)]
let len = i.src.len()?.try_into().unwrap();
i.discard_cache(offset, len);
}
// Similarly, discard the system cache for the output file.
//
// TODO Better error handling for overflowing `len`.
if i.settings.oflags.nocache {
let offset = 0;
#[allow(clippy::useless_conversion)]
let len = o.dst.len()?.try_into().unwrap();
o.discard_cache(offset, len);
}
Ok(())
}
/// Copy the given input data to this output, consuming both. /// Copy the given input data to this output, consuming both.
/// ///
/// This method contains the main loop for the `dd` program. Bytes /// This method contains the main loop for the `dd` program. Bytes
@ -981,22 +1054,7 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
// requests that we inform the system that we no longer // requests that we inform the system that we no longer
// need the contents of the input file in a system cache. // need the contents of the input file in a system cache.
// //
// TODO Better error handling for overflowing `len`. flush_caches_full_length(&i, &o)?;
if i.settings.iflags.nocache {
let offset = 0;
#[allow(clippy::useless_conversion)]
let len = i.src.len()?.try_into().unwrap();
i.discard_cache(offset, len);
}
// Similarly, discard the system cache for the output file.
//
// TODO Better error handling for overflowing `len`.
if i.settings.oflags.nocache {
let offset = 0;
#[allow(clippy::useless_conversion)]
let len = o.dst.len()?.try_into().unwrap();
o.discard_cache(offset, len);
}
return finalize( return finalize(
BlockWriter::Unbuffered(o), BlockWriter::Unbuffered(o),
rstat, rstat,
@ -1018,6 +1076,18 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
// This avoids the need to query the OS monotonic clock for every block. // This avoids the need to query the OS monotonic clock for every block.
let alarm = Alarm::with_interval(Duration::from_secs(1)); let alarm = Alarm::with_interval(Duration::from_secs(1));
// The signal handler spawns an own thread that waits for signals.
// When the signal is received, it calls a handler function.
// We inject a handler function that manually triggers the alarm.
#[cfg(target_os = "linux")]
let signal_handler = progress::SignalHandler::install_signal_handler(alarm.manual_trigger_fn());
#[cfg(target_os = "linux")]
if let Err(e) = &signal_handler {
if Some(StatusLevel::None) != i.settings.status {
eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}");
}
}
// Index in the input file where we are reading bytes and in // Index in the input file where we are reading bytes and in
// the output file where we are writing bytes. // the output file where we are writing bytes.
// //
@ -1086,11 +1156,20 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
// error. // error.
rstat += rstat_update; rstat += rstat_update;
wstat += wstat_update; wstat += wstat_update;
if alarm.is_triggered() { match alarm.get_trigger() {
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); ALARM_TRIGGER_NONE => {}
t @ ALARM_TRIGGER_TIMER | t @ ALARM_TRIGGER_SIGNAL => {
let tp = match t {
ALARM_TRIGGER_TIMER => ProgUpdateType::Periodic,
_ => ProgUpdateType::Signal,
};
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), tp);
prog_tx.send(prog_update).unwrap_or(()); prog_tx.send(prog_update).unwrap_or(());
} }
_ => {}
} }
}
finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate) finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate)
} }
@ -1118,12 +1197,13 @@ fn finalize<T>(
// Print the final read/write statistics. // Print the final read/write statistics.
let wstat = wstat + wstat_update; let wstat = wstat + wstat_update;
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true); let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), ProgUpdateType::Final);
prog_tx.send(prog_update).unwrap_or(()); prog_tx.send(prog_update).unwrap_or(());
// Wait for the output thread to finish // Wait for the output thread to finish
output_thread output_thread
.join() .join()
.expect("Failed to join with the output thread."); .expect("Failed to join with the output thread.");
Ok(()) Ok(())
} }

View file

@ -11,8 +11,12 @@
//! updater that runs in its own thread. //! updater that runs in its own thread.
use std::io::Write; use std::io::Write;
use std::sync::mpsc; use std::sync::mpsc;
#[cfg(target_os = "linux")]
use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
#[cfg(target_os = "linux")]
use signal_hook::iterator::Handle;
use uucore::{ use uucore::{
error::UResult, error::UResult,
format::num_format::{FloatVariant, Formatter}, format::num_format::{FloatVariant, Formatter},
@ -20,18 +24,12 @@ use uucore::{
use crate::numbers::{to_magnitude_and_suffix, SuffixType}; use crate::numbers::{to_magnitude_and_suffix, SuffixType};
// On Linux, we register a signal handler that prints progress updates. #[derive(PartialEq, Eq)]
#[cfg(target_os = "linux")] pub(crate) enum ProgUpdateType {
use signal_hook::consts::signal; Periodic,
#[cfg(target_os = "linux")] Signal,
use std::{ Final,
env, }
error::Error,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
/// Summary statistics for read and write progress of dd for a given duration. /// Summary statistics for read and write progress of dd for a given duration.
pub(crate) struct ProgUpdate { pub(crate) struct ProgUpdate {
@ -53,7 +51,7 @@ pub(crate) struct ProgUpdate {
/// The status of the write. /// The status of the write.
/// ///
/// True if the write is completed, false if still in-progress. /// True if the write is completed, false if still in-progress.
pub(crate) complete: bool, pub(crate) update_type: ProgUpdateType,
} }
impl ProgUpdate { impl ProgUpdate {
@ -62,13 +60,13 @@ impl ProgUpdate {
read_stat: ReadStat, read_stat: ReadStat,
write_stat: WriteStat, write_stat: WriteStat,
duration: Duration, duration: Duration,
complete: bool, update_type: ProgUpdateType,
) -> Self { ) -> Self {
Self { Self {
read_stat, read_stat,
write_stat, write_stat,
duration, duration,
complete, update_type,
} }
} }
@ -433,7 +431,7 @@ pub(crate) fn gen_prog_updater(
let mut progress_printed = false; let mut progress_printed = false;
while let Ok(update) = rx.recv() { while let Ok(update) = rx.recv() {
// Print the final read/write statistics. // Print the final read/write statistics.
if update.complete { if update.update_type == ProgUpdateType::Final {
update.print_final_stats(print_level, progress_printed); update.print_final_stats(print_level, progress_printed);
return; return;
} }
@ -445,6 +443,49 @@ pub(crate) fn gen_prog_updater(
} }
} }
/// signal handler listens for SIGUSR1 signal and runs provided closure.
#[cfg(target_os = "linux")]
pub(crate) struct SignalHandler {
handle: Handle,
thread: Option<JoinHandle<()>>,
}
#[cfg(target_os = "linux")]
impl SignalHandler {
pub(crate) fn install_signal_handler(
f: Box<dyn Send + Sync + Fn()>,
) -> Result<Self, std::io::Error> {
use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
let mut signals = Signals::new([SIGUSR1])?;
let handle = signals.handle();
let thread = std::thread::spawn(move || {
for signal in &mut signals {
match signal {
SIGUSR1 => (*f)(),
_ => unreachable!(),
}
}
});
Ok(Self {
handle,
thread: Some(thread),
})
}
}
#[cfg(target_os = "linux")]
impl Drop for SignalHandler {
fn drop(&mut self) {
self.handle.close();
if let Some(thread) = std::mem::take(&mut self.thread) {
thread.join().unwrap();
}
}
}
/// Return a closure that can be used in its own thread to print progress info. /// Return a closure that can be used in its own thread to print progress info.
/// ///
/// This function returns a closure that receives [`ProgUpdate`] /// This function returns a closure that receives [`ProgUpdate`]
@ -459,47 +500,27 @@ pub(crate) fn gen_prog_updater(
rx: mpsc::Receiver<ProgUpdate>, rx: mpsc::Receiver<ProgUpdate>,
print_level: Option<StatusLevel>, print_level: Option<StatusLevel>,
) -> impl Fn() { ) -> impl Fn() {
// TODO: SIGINFO: Trigger progress line reprint. BSD-style Linux only.
const SIGUSR1_USIZE: usize = signal::SIGUSR1 as usize;
fn posixly_correct() -> bool {
env::var("POSIXLY_CORRECT").is_ok()
}
fn register_linux_signal_handler(sigval: Arc<AtomicUsize>) -> Result<(), Box<dyn Error>> {
if !posixly_correct() {
signal_hook::flag::register_usize(signal::SIGUSR1, sigval, SIGUSR1_USIZE)?;
}
Ok(())
}
// -------------------------------------------------------------- // --------------------------------------------------------------
move || { move || {
let sigval = Arc::new(AtomicUsize::new(0));
register_linux_signal_handler(sigval.clone()).unwrap_or_else(|e| {
if Some(StatusLevel::None) != print_level {
eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}");
}
});
// Holds the state of whether we have printed the current progress. // Holds the state of whether we have printed the current progress.
// This is needed so that we know whether or not to print a newline // This is needed so that we know whether or not to print a newline
// character before outputting non-progress data. // character before outputting non-progress data.
let mut progress_printed = false; let mut progress_printed = false;
while let Ok(update) = rx.recv() { while let Ok(update) = rx.recv() {
match update.update_type {
ProgUpdateType::Final => {
// Print the final read/write statistics. // Print the final read/write statistics.
if update.complete {
update.print_final_stats(print_level, progress_printed); update.print_final_stats(print_level, progress_printed);
return; return;
} }
ProgUpdateType::Periodic => {
// (Re)print status line if progress is requested. // (Re)print status line if progress is requested.
if Some(StatusLevel::Progress) == print_level && !update.complete { if Some(StatusLevel::Progress) == print_level {
update.reprint_prog_line(); update.reprint_prog_line();
progress_printed = true; progress_printed = true;
} }
// Handle signals and set the signal to un-seen. }
// This will print a maximum of 1 time per second, even though it ProgUpdateType::Signal => {
// should be printing on every SIGUSR1.
if let SIGUSR1_USIZE = sigval.swap(0, Ordering::Relaxed) {
update.print_transfer_stats(progress_printed); update.print_transfer_stats(progress_printed);
// Reset the progress printed, since print_transfer_stats always prints a newline. // Reset the progress printed, since print_transfer_stats always prints a newline.
progress_printed = false; progress_printed = false;
@ -507,6 +528,7 @@ pub(crate) fn gen_prog_updater(
} }
} }
} }
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -524,7 +546,7 @@ mod tests {
..Default::default() ..Default::default()
}, },
duration: Duration::new(1, 0), // one second duration: Duration::new(1, 0), // one second
complete: false, update_type: super::ProgUpdateType::Periodic,
} }
} }
@ -533,7 +555,7 @@ mod tests {
read_stat: ReadStat::default(), read_stat: ReadStat::default(),
write_stat: WriteStat::default(), write_stat: WriteStat::default(),
duration, duration,
complete: false, update_type: super::ProgUpdateType::Periodic,
} }
} }
@ -558,12 +580,12 @@ mod tests {
let read_stat = ReadStat::new(1, 2, 3, 4); let read_stat = ReadStat::new(1, 2, 3, 4);
let write_stat = WriteStat::new(4, 5, 6); let write_stat = WriteStat::new(4, 5, 6);
let duration = Duration::new(789, 0); let duration = Duration::new(789, 0);
let complete = false; let update_type = super::ProgUpdateType::Periodic;
let prog_update = ProgUpdate { let prog_update = ProgUpdate {
read_stat, read_stat,
write_stat, write_stat,
duration, duration,
complete, update_type,
}; };
let mut cursor = Cursor::new(vec![]); let mut cursor = Cursor::new(vec![]);
@ -580,7 +602,7 @@ mod tests {
read_stat: ReadStat::default(), read_stat: ReadStat::default(),
write_stat: WriteStat::default(), write_stat: WriteStat::default(),
duration: Duration::new(1, 0), // one second duration: Duration::new(1, 0), // one second
complete: false, update_type: super::ProgUpdateType::Periodic,
}; };
let mut cursor = Cursor::new(vec![]); let mut cursor = Cursor::new(vec![]);
@ -636,7 +658,7 @@ mod tests {
read_stat: ReadStat::default(), read_stat: ReadStat::default(),
write_stat: WriteStat::default(), write_stat: WriteStat::default(),
duration: Duration::new(1, 0), // one second duration: Duration::new(1, 0), // one second
complete: false, update_type: super::ProgUpdateType::Periodic,
}; };
let mut cursor = Cursor::new(vec![]); let mut cursor = Cursor::new(vec![]);
prog_update prog_update
@ -657,7 +679,7 @@ mod tests {
read_stat: ReadStat::default(), read_stat: ReadStat::default(),
write_stat: WriteStat::default(), write_stat: WriteStat::default(),
duration: Duration::new(1, 0), // one second duration: Duration::new(1, 0), // one second
complete: false, update_type: super::ProgUpdateType::Periodic,
}; };
let mut cursor = Cursor::new(vec![]); let mut cursor = Cursor::new(vec![]);
let rewrite = true; let rewrite = true;