From 84b5e6f0a1275ffd6816d4478284ed2ef27e5f1e Mon Sep 17 00:00:00 2001 From: Clint Teece Date: Sat, 25 Nov 2023 09:13:12 -0500 Subject: [PATCH] du: start printing output immediately (#5552) * du: very rough draft of continuously printing output * du: clean up printing logic, still needs some polishing * du: gracefully handle case where `du` returns no `Stat`s * du: print output using separate thread * du: clean up print thread implementation * du: send ownership of `Stat`s to printing thread as soon as `du` is done with them * du: add basic error handling for communication between threads, use `StatPrinter` to handle printing thread logic * du: move printing grand total into `StatPrinter`, and move initialization of printing-related variables into `StatPrinter::new` * du: clean up calculation of `convert_size` function, and separate printing a single stat our into its own method in `StatPrinter` * du: have printing thread handle printing IO-related errors, to ensure error messages and regular output message are written one at a time * du: add comment explaining print thread, remove outdated comments and clippy allows * du: restore clippy allows for cognitive complexity --------- Co-authored-by: clint --- src/uu/du/src/du.rs | 317 +++++++++++++++++++++++++++----------------- 1 file changed, 193 insertions(+), 124 deletions(-) diff --git a/src/uu/du/src/du.rs b/src/uu/du/src/du.rs index 148b197df..dc03a64f2 100644 --- a/src/uu/du/src/du.rs +++ b/src/uu/du/src/du.rs @@ -16,8 +16,6 @@ use std::fs::File; use std::fs::Metadata; use std::io::BufRead; use std::io::BufReader; -use std::io::Result; -use std::iter; #[cfg(not(windows))] use std::os::unix::fs::MetadataExt; #[cfg(windows)] @@ -27,15 +25,17 @@ use std::os::windows::io::AsRawHandle; use std::path::Path; use std::path::PathBuf; use std::str::FromStr; +use std::sync::mpsc; +use std::thread; use std::time::{Duration, UNIX_EPOCH}; use std::{error::Error, fmt::Display}; use uucore::display::{print_verbatim, Quotable}; use uucore::error::FromIo; -use uucore::error::{set_exit_code, UError, UResult, USimpleError}; +use uucore::error::{UError, UResult, USimpleError}; use uucore::line_ending::LineEnding; use uucore::parse_glob; use uucore::parse_size::{parse_size_u64, ParseSizeError}; -use uucore::{format_usage, help_about, help_section, help_usage, show, show_error, show_warning}; +use uucore::{format_usage, help_about, help_section, help_usage, show, show_warning}; #[cfg(windows)] use windows_sys::Win32::Foundation::HANDLE; #[cfg(windows)] @@ -81,6 +81,7 @@ const USAGE: &str = help_usage!("du.md"); // TODO: Support Z & Y (currently limited by size of u64) const UNITS: [(char, u32); 6] = [('E', 6), ('P', 5), ('T', 4), ('G', 3), ('M', 2), ('K', 1)]; +#[derive(Clone)] struct Options { all: bool, max_depth: Option, @@ -93,7 +94,7 @@ struct Options { verbose: bool, } -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] enum Deref { All, Args(Vec), @@ -119,7 +120,7 @@ struct Stat { } impl Stat { - fn new(path: &Path, options: &Options) -> Result { + fn new(path: &Path, options: &Options) -> std::io::Result { // Determine whether to dereference (follow) the symbolic link let should_dereference = match &options.dereference { Deref::All => true, @@ -290,7 +291,6 @@ fn choose_size(matches: &ArgMatches, stat: &Stat) -> u64 { } // this takes `my_stat` to avoid having to stat files multiple times. -// XXX: this should use the impl Trait return type when it is stabilized #[allow(clippy::cognitive_complexity)] fn du( mut my_stat: Stat, @@ -298,18 +298,16 @@ fn du( depth: usize, seen_inodes: &mut HashSet, exclude: &[Pattern], -) -> Box> { - let mut stats = vec![]; - let mut futures = vec![]; - + print_tx: &mpsc::Sender>, +) -> Result>>> { if my_stat.is_dir { let read = match fs::read_dir(&my_stat.path) { Ok(read) => read, Err(e) => { - show!( - e.map_err_context(|| format!("cannot read directory {}", my_stat.path.quote())) - ); - return Box::new(iter::once(my_stat)); + print_tx.send(Err(e.map_err_context(|| { + format!("cannot read directory {}", my_stat.path.quote()) + })))?; + return Ok(my_stat); } }; @@ -354,44 +352,48 @@ fn du( } } } - futures.push(du( + + let this_stat = du( this_stat, options, depth + 1, seen_inodes, exclude, - )); + print_tx, + )?; + + if !options.separate_dirs { + my_stat.size += this_stat.size; + my_stat.blocks += this_stat.blocks; + my_stat.inodes += this_stat.inodes; + } + print_tx.send(Ok(StatPrintInfo { + stat: this_stat, + depth: depth + 1, + }))?; } else { my_stat.size += this_stat.size; my_stat.blocks += this_stat.blocks; my_stat.inodes += 1; if options.all { - stats.push(this_stat); + print_tx.send(Ok(StatPrintInfo { + stat: this_stat, + depth: depth + 1, + }))?; } } } - Err(e) => show!( - e.map_err_context(|| format!("cannot access {}", entry.path().quote())) - ), + Err(e) => print_tx.send(Err(e.map_err_context(|| { + format!("cannot access {}", entry.path().quote()) + })))?, } } - Err(error) => show_error!("{}", error), + Err(error) => print_tx.send(Err(error.into()))?, } } } - stats.extend(futures.into_iter().flatten().filter(|stat| { - if !options.separate_dirs && stat.path.parent().unwrap() == my_stat.path { - my_stat.size += stat.size; - my_stat.blocks += stat.blocks; - my_stat.inodes += stat.inodes; - } - options - .max_depth - .map_or(true, |max_depth| depth < max_depth) - })); - stats.push(my_stat); - Box::new(stats.into_iter()) + Ok(my_stat) } fn convert_size_human(size: u64, multiplier: u64, _block_size: u64) -> String { @@ -426,7 +428,7 @@ fn convert_size_other(size: u64, _multiplier: u64, block_size: u64) -> String { format!("{}", ((size as f64) / (block_size as f64)).ceil()) } -fn get_convert_size_fn(matches: &ArgMatches) -> Box String> { +fn get_convert_size_fn(matches: &ArgMatches) -> Box String + Send> { if matches.get_flag(options::HUMAN_READABLE) || matches.get_flag(options::SI) { Box::new(convert_size_human) } else if matches.get_flag(options::BYTES) { @@ -532,6 +534,137 @@ fn build_exclude_patterns(matches: &ArgMatches) -> UResult> { Ok(exclude_patterns) } +struct StatPrintInfo { + stat: Stat, + depth: usize, +} + +struct StatPrinter { + matches: ArgMatches, + threshold: Option, + summarize: bool, + time_format_str: String, + line_ending: LineEnding, + options: Options, + convert_size: Box String + Send>, +} + +impl StatPrinter { + fn new(matches: ArgMatches, options: Options, summarize: bool) -> UResult { + let block_size = read_block_size( + matches + .get_one::(options::BLOCK_SIZE) + .map(|s| s.as_str()), + )?; + + let multiplier: u64 = if matches.get_flag(options::SI) { + 1000 + } else { + 1024 + }; + + let convert_size_fn = get_convert_size_fn(&matches); + + let convert_size: Box String + Send> = if options.inodes { + Box::new(|size: u64| size.to_string()) + } else { + Box::new(move |size: u64| convert_size_fn(size, multiplier, block_size)) + }; + + let threshold = match matches.get_one::(options::THRESHOLD) { + Some(s) => match Threshold::from_str(s) { + Ok(t) => Some(t), + Err(e) => { + return Err(USimpleError::new( + 1, + format_error_message(&e, s, options::THRESHOLD), + )) + } + }, + None => None, + }; + + let time_format_str = + parse_time_style(matches.get_one::("time-style").map(|s| s.as_str()))? + .to_string(); + + let line_ending = LineEnding::from_zero_flag(matches.get_flag(options::NULL)); + + Ok(Self { + matches, + threshold, + summarize, + time_format_str, + line_ending, + options, + convert_size, + }) + } + + fn print_stats(&self, rx: &mpsc::Receiver>) -> UResult<()> { + let mut grand_total = 0; + loop { + let received = rx.recv(); + + match received { + Ok(message) => match message { + Ok(stat_info) => { + let size = choose_size(&self.matches, &stat_info.stat); + + if stat_info.depth == 0 { + grand_total += size; + } + + if !self + .threshold + .map_or(false, |threshold| threshold.should_exclude(size)) + && self + .options + .max_depth + .map_or(true, |max_depth| stat_info.depth <= max_depth) + && (!self.summarize || stat_info.depth == 0) + { + self.print_stat(&stat_info.stat, size)?; + } + } + Err(e) => show!(e), + }, + Err(_) => break, + } + } + + if self.options.total { + print!("{}\ttotal", (self.convert_size)(grand_total)); + print!("{}", self.line_ending); + } + + Ok(()) + } + + fn print_stat(&self, stat: &Stat, size: u64) -> UResult<()> { + if self.matches.contains_id(options::TIME) { + let tm = { + let secs = self + .matches + .get_one::(options::TIME) + .map(|s| get_time_secs(s, stat)) + .transpose()? + .unwrap_or(stat.modified); + DateTime::::from(UNIX_EPOCH + Duration::from_secs(secs)) + }; + let time_str = tm.format(&self.time_format_str).to_string(); + print!("{}\t{}\t", (self.convert_size)(size), time_str); + } else { + print!("{}\t", (self.convert_size)(size)); + } + + print_verbatim(&stat.path).unwrap(); + print!("{}", self.line_ending); + + Ok(()) + } +} + #[uucore::main] #[allow(clippy::cognitive_complexity)] pub fn uumain(args: impl uucore::Args) -> UResult<()> { @@ -582,49 +715,13 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { show_warning!("options --apparent-size and -b are ineffective with --inodes"); } - let block_size = read_block_size( - matches - .get_one::(options::BLOCK_SIZE) - .map(|s| s.as_str()), - )?; - - let threshold = match matches.get_one::(options::THRESHOLD) { - Some(s) => match Threshold::from_str(s) { - Ok(t) => Some(t), - Err(e) => { - return Err(USimpleError::new( - 1, - format_error_message(&e, s, options::THRESHOLD), - )) - } - }, - None => None, - }; - - let multiplier: u64 = if matches.get_flag(options::SI) { - 1000 - } else { - 1024 - }; - - let convert_size_fn = get_convert_size_fn(&matches); - - let convert_size = |size: u64| { - if options.inodes { - size.to_string() - } else { - convert_size_fn(size, multiplier, block_size) - } - }; - - let time_format_str = - parse_time_style(matches.get_one::("time-style").map(|s| s.as_str()))?; - - let line_ending = LineEnding::from_zero_flag(matches.get_flag(options::NULL)); + // Use separate thread to print output, so we can print finished results while computation is still running + let stat_printer = StatPrinter::new(matches.clone(), options.clone(), summarize)?; + let (print_tx, rx) = mpsc::channel::>(); + let printing_thread = thread::spawn(move || stat_printer.print_stats(&rx)); let excludes = build_exclude_patterns(&matches)?; - let mut grand_total = 0; 'loop_file: for path in files { // Skip if we don't want to ignore anything if !&excludes.is_empty() { @@ -647,63 +744,35 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { if let Some(inode) = stat.inode { seen_inodes.insert(inode); } - let iter = du(stat, &options, 0, &mut seen_inodes, &excludes); + let stat = du(stat, &options, 0, &mut seen_inodes, &excludes, &print_tx) + .map_err(|e| USimpleError::new(1, e.to_string()))?; - // Sum up all the returned `Stat`s and display results - let (_, len) = iter.size_hint(); - let len = len.unwrap(); - for (index, stat) in iter.enumerate() { - let size = choose_size(&matches, &stat); - - if threshold.map_or(false, |threshold| threshold.should_exclude(size)) { - continue; - } - - if matches.contains_id(options::TIME) { - let tm = { - let secs = matches - .get_one::(options::TIME) - .map(|s| get_time_secs(s, &stat)) - .transpose()? - .unwrap_or(stat.modified); - DateTime::::from(UNIX_EPOCH + Duration::from_secs(secs)) - }; - if !summarize || index == len - 1 { - let time_str = tm.format(time_format_str).to_string(); - print!("{}\t{}\t", convert_size(size), time_str); - print_verbatim(stat.path).unwrap(); - print!("{line_ending}"); - } - } else if !summarize || index == len - 1 { - print!("{}\t", convert_size(size)); - print_verbatim(stat.path).unwrap(); - print!("{line_ending}"); - } - if options.total && index == (len - 1) { - // The last element will be the total size of the the path under - // path_string. We add it to the grand total. - grand_total += size; - } - } + print_tx + .send(Ok(StatPrintInfo { stat, depth: 0 })) + .map_err(|e| USimpleError::new(1, e.to_string()))?; } else { - show_error!( - "{}: {}", - path.to_string_lossy().maybe_quote(), - "No such file or directory" - ); - set_exit_code(1); + print_tx + .send(Err(USimpleError::new( + 1, + format!( + "{}: No such file or directory", + path.to_string_lossy().maybe_quote() + ), + ))) + .map_err(|e| USimpleError::new(1, e.to_string()))?; } } - if options.total { - print!("{}\ttotal", convert_size(grand_total)); - print!("{line_ending}"); - } + drop(print_tx); + + printing_thread + .join() + .map_err(|_| USimpleError::new(1, "Printing thread panicked."))??; Ok(()) } -fn get_time_secs(s: &str, stat: &Stat) -> std::result::Result { +fn get_time_secs(s: &str, stat: &Stat) -> Result { let secs = match s { "ctime" | "status" => stat.modified, "access" | "atime" | "use" => stat.accessed, @@ -966,7 +1035,7 @@ enum Threshold { impl FromStr for Threshold { type Err = ParseSizeError; - fn from_str(s: &str) -> std::result::Result { + fn from_str(s: &str) -> Result { let offset = usize::from(s.starts_with(&['-', '+'][..])); let size = parse_size_u64(&s[offset..])?;