mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-28 03:27:44 +00:00
handle SIGUSR1 directly. not just every 1sec
This commit is contained in:
parent
88ff42e840
commit
43b2b3fbaa
2 changed files with 128 additions and 74 deletions
|
@ -21,6 +21,7 @@ use nix::fcntl::FcntlArg::F_SETFL;
|
|||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
use nix::fcntl::OFlag;
|
||||
use parseargs::Parser;
|
||||
use progress::ProgUpdateType;
|
||||
use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat};
|
||||
use uucore::io::OwnedFileDescriptorOrHandle;
|
||||
|
||||
|
@ -39,10 +40,8 @@ use std::os::unix::{
|
|||
#[cfg(windows)]
|
||||
use std::os::windows::{fs::MetadataExt, io::AsHandle};
|
||||
use std::path::Path;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering::Relaxed},
|
||||
mpsc, Arc,
|
||||
};
|
||||
use std::sync::atomic::AtomicU8;
|
||||
use std::sync::{atomic::Ordering::Relaxed, mpsc, Arc};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
@ -94,29 +93,41 @@ struct Settings {
|
|||
/// the first caller each interval will yield true.
|
||||
///
|
||||
/// When all instances are dropped the background thread will exit on the next interval.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Alarm {
|
||||
interval: Duration,
|
||||
trigger: Arc<AtomicBool>,
|
||||
trigger: Arc<AtomicU8>,
|
||||
}
|
||||
|
||||
const TRIGGER_NONE: u8 = 0;
|
||||
const TRIGGER_TIMER: u8 = 1;
|
||||
const TRIGGER_SIGNAL: u8 = 2;
|
||||
|
||||
impl Alarm {
|
||||
pub fn with_interval(interval: Duration) -> Self {
|
||||
let trigger = Arc::new(AtomicBool::default());
|
||||
let trigger = Arc::new(AtomicU8::default());
|
||||
|
||||
let weak_trigger = Arc::downgrade(&trigger);
|
||||
thread::spawn(move || {
|
||||
while let Some(trigger) = weak_trigger.upgrade() {
|
||||
thread::sleep(interval);
|
||||
trigger.store(true, Relaxed);
|
||||
trigger.store(TRIGGER_TIMER, Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
Self { interval, trigger }
|
||||
}
|
||||
|
||||
pub fn is_triggered(&self) -> bool {
|
||||
self.trigger.swap(false, Relaxed)
|
||||
pub fn manual_trigger_fn(&self) -> Box<dyn Send + Sync + Fn()> {
|
||||
let weak_trigger = Arc::downgrade(&self.trigger);
|
||||
Box::new(move || {
|
||||
if let Some(trigger) = weak_trigger.upgrade() {
|
||||
trigger.store(TRIGGER_SIGNAL, Relaxed);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_trigger(&self) -> u8 {
|
||||
self.trigger.swap(TRIGGER_NONE, Relaxed)
|
||||
}
|
||||
|
||||
pub fn get_interval(&self) -> Duration {
|
||||
|
@ -1018,6 +1029,18 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
|
|||
// This avoids the need to query the OS monotonic clock for every block.
|
||||
let alarm = Alarm::with_interval(Duration::from_secs(1));
|
||||
|
||||
// The signal handler spawns an own thread that waits for signals.
|
||||
// When the signal is received, it calls a handler function.
|
||||
// We inject a handler function that manually triggers the alarm.
|
||||
#[cfg(target_os = "linux")]
|
||||
let signal_handler = progress::SignalHandler::install_signal_handler(alarm.manual_trigger_fn());
|
||||
#[cfg(target_os = "linux")]
|
||||
if let Err(e) = &signal_handler {
|
||||
if Some(StatusLevel::None) != i.settings.status {
|
||||
eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}");
|
||||
}
|
||||
}
|
||||
|
||||
// Index in the input file where we are reading bytes and in
|
||||
// the output file where we are writing bytes.
|
||||
//
|
||||
|
@ -1086,11 +1109,20 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
|
|||
// error.
|
||||
rstat += rstat_update;
|
||||
wstat += wstat_update;
|
||||
if alarm.is_triggered() {
|
||||
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
|
||||
prog_tx.send(prog_update).unwrap_or(());
|
||||
match alarm.get_trigger() {
|
||||
TRIGGER_NONE => {}
|
||||
t @ TRIGGER_TIMER | t @ TRIGGER_SIGNAL => {
|
||||
let tp = match t {
|
||||
TRIGGER_TIMER => ProgUpdateType::Periodic,
|
||||
_ => ProgUpdateType::Signal,
|
||||
};
|
||||
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), tp);
|
||||
prog_tx.send(prog_update).unwrap_or(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate)
|
||||
}
|
||||
|
||||
|
@ -1118,12 +1150,13 @@ fn finalize<T>(
|
|||
|
||||
// Print the final read/write statistics.
|
||||
let wstat = wstat + wstat_update;
|
||||
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
|
||||
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), ProgUpdateType::Final);
|
||||
prog_tx.send(prog_update).unwrap_or(());
|
||||
// Wait for the output thread to finish
|
||||
output_thread
|
||||
.join()
|
||||
.expect("Failed to join with the output thread.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -11,8 +11,12 @@
|
|||
//! updater that runs in its own thread.
|
||||
use std::io::Write;
|
||||
use std::sync::mpsc;
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use signal_hook::iterator::Handle;
|
||||
use uucore::{
|
||||
error::UResult,
|
||||
format::num_format::{FloatVariant, Formatter},
|
||||
|
@ -20,18 +24,12 @@ use uucore::{
|
|||
|
||||
use crate::numbers::{to_magnitude_and_suffix, SuffixType};
|
||||
|
||||
// On Linux, we register a signal handler that prints progress updates.
|
||||
#[cfg(target_os = "linux")]
|
||||
use signal_hook::consts::signal;
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::{
|
||||
env,
|
||||
error::Error,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub(crate) enum ProgUpdateType {
|
||||
Periodic,
|
||||
Signal,
|
||||
Final,
|
||||
}
|
||||
|
||||
/// Summary statistics for read and write progress of dd for a given duration.
|
||||
pub(crate) struct ProgUpdate {
|
||||
|
@ -53,7 +51,7 @@ pub(crate) struct ProgUpdate {
|
|||
/// The status of the write.
|
||||
///
|
||||
/// True if the write is completed, false if still in-progress.
|
||||
pub(crate) complete: bool,
|
||||
pub(crate) update_type: ProgUpdateType,
|
||||
}
|
||||
|
||||
impl ProgUpdate {
|
||||
|
@ -62,13 +60,13 @@ impl ProgUpdate {
|
|||
read_stat: ReadStat,
|
||||
write_stat: WriteStat,
|
||||
duration: Duration,
|
||||
complete: bool,
|
||||
update_type: ProgUpdateType,
|
||||
) -> Self {
|
||||
Self {
|
||||
read_stat,
|
||||
write_stat,
|
||||
duration,
|
||||
complete,
|
||||
update_type,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -433,7 +431,7 @@ pub(crate) fn gen_prog_updater(
|
|||
let mut progress_printed = false;
|
||||
while let Ok(update) = rx.recv() {
|
||||
// Print the final read/write statistics.
|
||||
if update.complete {
|
||||
if update.update_type == ProgUpdateType::Final {
|
||||
update.print_final_stats(print_level, progress_printed);
|
||||
return;
|
||||
}
|
||||
|
@ -445,6 +443,48 @@ pub(crate) fn gen_prog_updater(
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) struct SignalHandler {
|
||||
handle: Handle,
|
||||
thread: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
impl SignalHandler {
|
||||
pub(crate) fn install_signal_handler(
|
||||
f: Box<dyn Send + Sync + Fn()>,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
use signal_hook::consts::signal::*;
|
||||
use signal_hook::iterator::Signals;
|
||||
|
||||
let mut signals = Signals::new([SIGUSR1])?;
|
||||
let handle = signals.handle();
|
||||
let thread = std::thread::spawn(move || {
|
||||
for signal in &mut signals {
|
||||
match signal {
|
||||
SIGUSR1 => (*f)(),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
handle,
|
||||
thread: Some(thread),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
impl Drop for SignalHandler {
|
||||
fn drop(&mut self) {
|
||||
self.handle.close();
|
||||
if let Some(thread) = std::mem::take(&mut self.thread) {
|
||||
thread.join().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a closure that can be used in its own thread to print progress info.
|
||||
///
|
||||
/// This function returns a closure that receives [`ProgUpdate`]
|
||||
|
@ -459,50 +499,31 @@ pub(crate) fn gen_prog_updater(
|
|||
rx: mpsc::Receiver<ProgUpdate>,
|
||||
print_level: Option<StatusLevel>,
|
||||
) -> impl Fn() {
|
||||
// TODO: SIGINFO: Trigger progress line reprint. BSD-style Linux only.
|
||||
const SIGUSR1_USIZE: usize = signal::SIGUSR1 as usize;
|
||||
fn posixly_correct() -> bool {
|
||||
env::var("POSIXLY_CORRECT").is_ok()
|
||||
}
|
||||
fn register_linux_signal_handler(sigval: Arc<AtomicUsize>) -> Result<(), Box<dyn Error>> {
|
||||
if !posixly_correct() {
|
||||
signal_hook::flag::register_usize(signal::SIGUSR1, sigval, SIGUSR1_USIZE)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// --------------------------------------------------------------
|
||||
move || {
|
||||
let sigval = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
register_linux_signal_handler(sigval.clone()).unwrap_or_else(|e| {
|
||||
if Some(StatusLevel::None) != print_level {
|
||||
eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}");
|
||||
}
|
||||
});
|
||||
|
||||
// Holds the state of whether we have printed the current progress.
|
||||
// This is needed so that we know whether or not to print a newline
|
||||
// character before outputting non-progress data.
|
||||
let mut progress_printed = false;
|
||||
while let Ok(update) = rx.recv() {
|
||||
// Print the final read/write statistics.
|
||||
if update.complete {
|
||||
update.print_final_stats(print_level, progress_printed);
|
||||
return;
|
||||
}
|
||||
// (Re)print status line if progress is requested.
|
||||
if Some(StatusLevel::Progress) == print_level && !update.complete {
|
||||
update.reprint_prog_line();
|
||||
progress_printed = true;
|
||||
}
|
||||
// Handle signals and set the signal to un-seen.
|
||||
// This will print a maximum of 1 time per second, even though it
|
||||
// should be printing on every SIGUSR1.
|
||||
if let SIGUSR1_USIZE = sigval.swap(0, Ordering::Relaxed) {
|
||||
update.print_transfer_stats(progress_printed);
|
||||
// Reset the progress printed, since print_transfer_stats always prints a newline.
|
||||
progress_printed = false;
|
||||
match update.update_type {
|
||||
ProgUpdateType::Final => {
|
||||
// Print the final read/write statistics.
|
||||
update.print_final_stats(print_level, progress_printed);
|
||||
return;
|
||||
}
|
||||
ProgUpdateType::Periodic => {
|
||||
// (Re)print status line if progress is requested.
|
||||
if Some(StatusLevel::Progress) == print_level {
|
||||
update.reprint_prog_line();
|
||||
progress_printed = true;
|
||||
}
|
||||
}
|
||||
ProgUpdateType::Signal => {
|
||||
update.print_transfer_stats(progress_printed);
|
||||
// Reset the progress printed, since print_transfer_stats always prints a newline.
|
||||
progress_printed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -524,7 +545,7 @@ mod tests {
|
|||
..Default::default()
|
||||
},
|
||||
duration: Duration::new(1, 0), // one second
|
||||
complete: false,
|
||||
update_type: super::ProgUpdateType::Periodic,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -533,7 +554,7 @@ mod tests {
|
|||
read_stat: ReadStat::default(),
|
||||
write_stat: WriteStat::default(),
|
||||
duration,
|
||||
complete: false,
|
||||
update_type: super::ProgUpdateType::Periodic,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -558,12 +579,12 @@ mod tests {
|
|||
let read_stat = ReadStat::new(1, 2, 3, 4);
|
||||
let write_stat = WriteStat::new(4, 5, 6);
|
||||
let duration = Duration::new(789, 0);
|
||||
let complete = false;
|
||||
let update_type = super::ProgUpdateType::Periodic;
|
||||
let prog_update = ProgUpdate {
|
||||
read_stat,
|
||||
write_stat,
|
||||
duration,
|
||||
complete,
|
||||
update_type,
|
||||
};
|
||||
|
||||
let mut cursor = Cursor::new(vec![]);
|
||||
|
@ -580,7 +601,7 @@ mod tests {
|
|||
read_stat: ReadStat::default(),
|
||||
write_stat: WriteStat::default(),
|
||||
duration: Duration::new(1, 0), // one second
|
||||
complete: false,
|
||||
update_type: super::ProgUpdateType::Periodic,
|
||||
};
|
||||
|
||||
let mut cursor = Cursor::new(vec![]);
|
||||
|
@ -636,7 +657,7 @@ mod tests {
|
|||
read_stat: ReadStat::default(),
|
||||
write_stat: WriteStat::default(),
|
||||
duration: Duration::new(1, 0), // one second
|
||||
complete: false,
|
||||
update_type: super::ProgUpdateType::Periodic,
|
||||
};
|
||||
let mut cursor = Cursor::new(vec![]);
|
||||
prog_update
|
||||
|
@ -657,7 +678,7 @@ mod tests {
|
|||
read_stat: ReadStat::default(),
|
||||
write_stat: WriteStat::default(),
|
||||
duration: Duration::new(1, 0), // one second
|
||||
complete: false,
|
||||
update_type: super::ProgUpdateType::Periodic,
|
||||
};
|
||||
let mut cursor = Cursor::new(vec![]);
|
||||
let rewrite = true;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue