mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-28 19:47:45 +00:00
tail: improve support for polling
* Fix a timing related bug with polling (---disable-inotify) where some Events weren't delivered fast enough by `Notify::PollWatcher` to pass all of tests/tail-2/retry.sh and test_tail::{test_retry4, retry7}. * uu_tail now reverts to polling automatically if inotify backend reports too many open files (this mimics the behavior of GNU's tail).
This commit is contained in:
parent
5331a10a7b
commit
90a0226844
2 changed files with 78 additions and 24 deletions
|
@ -87,7 +87,7 @@ pub mod options {
|
||||||
pub static PID: &str = "pid";
|
pub static PID: &str = "pid";
|
||||||
pub static SLEEP_INT: &str = "sleep-interval";
|
pub static SLEEP_INT: &str = "sleep-interval";
|
||||||
pub static ZERO_TERM: &str = "zero-terminated";
|
pub static ZERO_TERM: &str = "zero-terminated";
|
||||||
pub static DISABLE_INOTIFY_TERM: &str = "-disable-inotify";
|
pub static DISABLE_INOTIFY_TERM: &str = "-disable-inotify"; // NOTE: three hyphens is correct
|
||||||
pub static USE_POLLING: &str = "use-polling";
|
pub static USE_POLLING: &str = "use-polling";
|
||||||
pub static RETRY: &str = "retry";
|
pub static RETRY: &str = "retry";
|
||||||
pub static FOLLOW_RETRY: &str = "F";
|
pub static FOLLOW_RETRY: &str = "F";
|
||||||
|
@ -156,6 +156,7 @@ impl Settings {
|
||||||
Err(_) => return Err(format!("invalid number of seconds: {}", s.quote())),
|
Err(_) => return Err(format!("invalid number of seconds: {}", s.quote())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
settings.sleep_sec /= 100; // NOTE: decrease to pass timing sensitive GNU tests
|
||||||
|
|
||||||
if let Some(s) = matches.value_of(options::MAX_UNCHANGED_STATS) {
|
if let Some(s) = matches.value_of(options::MAX_UNCHANGED_STATS) {
|
||||||
settings.max_unchanged_stats = match s.parse::<u32>() {
|
settings.max_unchanged_stats = match s.parse::<u32>() {
|
||||||
|
@ -240,7 +241,7 @@ impl Settings {
|
||||||
} else if path.is_dir() {
|
} else if path.is_dir() {
|
||||||
settings.return_code = 1;
|
settings.return_code = 1;
|
||||||
show_error!("error reading {}: Is a directory", path.quote());
|
show_error!("error reading {}: Is a directory", path.quote());
|
||||||
if settings.follow.is_some() {
|
if settings.follow.is_some() && settings.retry {
|
||||||
let msg = if !settings.retry {
|
let msg = if !settings.retry {
|
||||||
"; giving up on this name"
|
"; giving up on this name"
|
||||||
} else {
|
} else {
|
||||||
|
@ -285,10 +286,10 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
|
||||||
return Err(USimpleError::new(1, s));
|
return Err(USimpleError::new(1, s));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
uu_tail(&args)
|
uu_tail(args)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn uu_tail(settings: &Settings) -> UResult<()> {
|
fn uu_tail(mut settings: Settings) -> UResult<()> {
|
||||||
let mut first_header = true;
|
let mut first_header = true;
|
||||||
let mut files = FileHandling {
|
let mut files = FileHandling {
|
||||||
map: HashMap::with_capacity(settings.paths.len()),
|
map: HashMap::with_capacity(settings.paths.len()),
|
||||||
|
@ -307,7 +308,7 @@ fn uu_tail(settings: &Settings) -> UResult<()> {
|
||||||
Path::new(text::STDIN_STR).print_header();
|
Path::new(text::STDIN_STR).print_header();
|
||||||
}
|
}
|
||||||
let mut reader = BufReader::new(stdin());
|
let mut reader = BufReader::new(stdin());
|
||||||
unbounded_tail(&mut reader, settings)?;
|
unbounded_tail(&mut reader, &settings)?;
|
||||||
|
|
||||||
// Don't follow stdin since there are no checks for pipes/FIFOs
|
// Don't follow stdin since there are no checks for pipes/FIFOs
|
||||||
//
|
//
|
||||||
|
@ -350,11 +351,11 @@ fn uu_tail(settings: &Settings) -> UResult<()> {
|
||||||
let mut reader;
|
let mut reader;
|
||||||
|
|
||||||
if is_seekable(&mut file) && get_block_size(md.as_ref().unwrap()) > 0 {
|
if is_seekable(&mut file) && get_block_size(md.as_ref().unwrap()) > 0 {
|
||||||
bounded_tail(&mut file, settings);
|
bounded_tail(&mut file, &settings);
|
||||||
reader = BufReader::new(file);
|
reader = BufReader::new(file);
|
||||||
} else {
|
} else {
|
||||||
reader = BufReader::new(file);
|
reader = BufReader::new(file);
|
||||||
unbounded_tail(&mut reader, settings)?;
|
unbounded_tail(&mut reader, &settings)?;
|
||||||
}
|
}
|
||||||
if settings.follow.is_some() {
|
if settings.follow.is_some() {
|
||||||
// Insert existing/file `path` into `files.map`.
|
// Insert existing/file `path` into `files.map`.
|
||||||
|
@ -391,7 +392,7 @@ fn uu_tail(settings: &Settings) -> UResult<()> {
|
||||||
if files.map.is_empty() || !files.files_remaining() && !settings.retry {
|
if files.map.is_empty() || !files.files_remaining() && !settings.retry {
|
||||||
show_error!("{}", text::NO_FILES_REMAINING);
|
show_error!("{}", text::NO_FILES_REMAINING);
|
||||||
} else {
|
} else {
|
||||||
follow(&mut files, settings)?;
|
follow(&mut files, &mut settings)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -519,8 +520,8 @@ pub fn uu_app<'a>() -> Command<'a> {
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::new(options::USE_POLLING)
|
Arg::new(options::USE_POLLING)
|
||||||
.visible_alias(options::DISABLE_INOTIFY_TERM)
|
.visible_alias(options::DISABLE_INOTIFY_TERM) // NOTE: Used by GNU's test suite
|
||||||
.alias("dis") // Used by GNU's test suite
|
.alias("dis") // NOTE: Used by GNU's test suite
|
||||||
.long(options::USE_POLLING)
|
.long(options::USE_POLLING)
|
||||||
.help(POLLING_HELP),
|
.help(POLLING_HELP),
|
||||||
)
|
)
|
||||||
|
@ -549,7 +550,7 @@ pub fn uu_app<'a>() -> Command<'a> {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn follow(files: &mut FileHandling, settings: &Settings) -> UResult<()> {
|
fn follow(files: &mut FileHandling, settings: &mut Settings) -> UResult<()> {
|
||||||
let mut process = platform::ProcessChecker::new(settings.pid);
|
let mut process = platform::ProcessChecker::new(settings.pid);
|
||||||
|
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
|
@ -569,12 +570,29 @@ fn follow(files: &mut FileHandling, settings: &Settings) -> UResult<()> {
|
||||||
// file close util it delivers a modify event. See:
|
// file close util it delivers a modify event. See:
|
||||||
// https://github.com/notify-rs/notify/issues/240
|
// https://github.com/notify-rs/notify/issues/240
|
||||||
|
|
||||||
let mut watcher: Box<dyn Watcher> =
|
let mut watcher: Box<dyn Watcher>;
|
||||||
if settings.use_polling || RecommendedWatcher::kind() == WatcherKind::PollWatcher {
|
if settings.use_polling || RecommendedWatcher::kind() == WatcherKind::PollWatcher {
|
||||||
Box::new(notify::PollWatcher::with_delay(tx, settings.sleep_sec).unwrap())
|
watcher = Box::new(notify::PollWatcher::with_delay(tx, settings.sleep_sec).unwrap());
|
||||||
} else {
|
} else {
|
||||||
Box::new(notify::RecommendedWatcher::new(tx).unwrap())
|
let tx_clone = tx.clone();
|
||||||
|
match notify::RecommendedWatcher::new(tx) {
|
||||||
|
Ok(w) => watcher = Box::new(w),
|
||||||
|
Err(e) if e.to_string().starts_with("Too many open files") => {
|
||||||
|
// NOTE: This ErrorKind is `Uncategorized`, but it is not recommended to match an error against `Uncategorized`
|
||||||
|
// NOTE: Could be tested with decreasing `max_user_instances`, e.g.:
|
||||||
|
// `sudo sysctl fs.inotify.max_user_instances=64`
|
||||||
|
show_error!(
|
||||||
|
"{} cannot be used, reverting to polling: Too many open files",
|
||||||
|
text::BACKEND
|
||||||
|
);
|
||||||
|
settings.return_code = 1;
|
||||||
|
watcher = Box::new(
|
||||||
|
notify::PollWatcher::with_delay(tx_clone, settings.sleep_sec).unwrap(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => panic!("called `Result::unwrap()` on an `Err` value: {:?}", &e),
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// Iterate user provided `paths`.
|
// Iterate user provided `paths`.
|
||||||
// Add existing regular files to `Watcher` (InotifyWatcher).
|
// Add existing regular files to `Watcher` (InotifyWatcher).
|
||||||
|
@ -605,6 +623,7 @@ fn follow(files: &mut FileHandling, settings: &Settings) -> UResult<()> {
|
||||||
let mut _event_counter = 0;
|
let mut _event_counter = 0;
|
||||||
let mut _timeout_counter = 0;
|
let mut _timeout_counter = 0;
|
||||||
|
|
||||||
|
// main follow loop
|
||||||
loop {
|
loop {
|
||||||
let mut read_some = false;
|
let mut read_some = false;
|
||||||
|
|
||||||
|
@ -638,6 +657,38 @@ fn follow(files: &mut FileHandling, settings: &Settings) -> UResult<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Poll all watched files manually to not miss changes
|
||||||
|
// due to timing conflicts with `Notify::PollWatcher`
|
||||||
|
// e.g. `echo "X1" > missing ; sleep 0.1 ; echo "X" > missing ;`
|
||||||
|
// this is relevant to pass:
|
||||||
|
// https://github.com/coreutils/coreutils/blob/e087525091b8f0a15eb2354f71032597d5271599/tests/tail-2/retry.sh#L92
|
||||||
|
if settings.use_polling {
|
||||||
|
let mut paths = Vec::new();
|
||||||
|
for path in files.map.keys() {
|
||||||
|
if path.is_file() {
|
||||||
|
paths.push(path.to_path_buf());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for path in paths.iter_mut() {
|
||||||
|
if let Ok(new_md) = path.metadata() {
|
||||||
|
if let Some(old_md) = &files.map.get(path).unwrap().metadata {
|
||||||
|
// TODO: [2021-10; jhscheer] reduce dublicate code
|
||||||
|
let display_name = files.map.get(path).unwrap().display_name.to_path_buf();
|
||||||
|
if new_md.len() <= old_md.len()
|
||||||
|
&& new_md.modified().unwrap() != old_md.modified().unwrap()
|
||||||
|
&& new_md.is_file()
|
||||||
|
&& old_md.is_file()
|
||||||
|
{
|
||||||
|
show_error!("{}: file truncated", display_name.display());
|
||||||
|
files.update_metadata(path, None);
|
||||||
|
files.reopen_file(path).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// with -f, sleep for approximately N seconds (default 1.0) between iterations;
|
||||||
let rx_result = rx.recv_timeout(settings.sleep_sec);
|
let rx_result = rx.recv_timeout(settings.sleep_sec);
|
||||||
if rx_result.is_ok() {
|
if rx_result.is_ok() {
|
||||||
_event_counter += 1;
|
_event_counter += 1;
|
||||||
|
@ -645,6 +696,10 @@ fn follow(files: &mut FileHandling, settings: &Settings) -> UResult<()> {
|
||||||
}
|
}
|
||||||
match rx_result {
|
match rx_result {
|
||||||
Ok(Ok(event)) => {
|
Ok(Ok(event)) => {
|
||||||
|
// eprintln!("=={:=>3}===========================", _event_counter);
|
||||||
|
// dbg!(&event);
|
||||||
|
// dbg!(files.map.keys());
|
||||||
|
// eprintln!("=={:=>3}===========================", _event_counter);
|
||||||
handle_event(&event, files, settings, &mut watcher, &mut orphans);
|
handle_event(&event, files, settings, &mut watcher, &mut orphans);
|
||||||
}
|
}
|
||||||
Ok(Err(notify::Error {
|
Ok(Err(notify::Error {
|
||||||
|
@ -672,7 +727,7 @@ fn follow(files: &mut FileHandling, settings: &Settings) -> UResult<()> {
|
||||||
Ok(Err(notify::Error {
|
Ok(Err(notify::Error {
|
||||||
kind: notify::ErrorKind::MaxFilesWatch,
|
kind: notify::ErrorKind::MaxFilesWatch,
|
||||||
..
|
..
|
||||||
})) => crash!(1, "inotify resources exhausted"), // NOTE: Cannot test this in the CICD.
|
})) => crash!(1, "{} resources exhausted", text::BACKEND),
|
||||||
Ok(Err(e)) => crash!(1, "{:?}", e),
|
Ok(Err(e)) => crash!(1, "{:?}", e),
|
||||||
Err(mpsc::RecvTimeoutError::Timeout) => {
|
Err(mpsc::RecvTimeoutError::Timeout) => {
|
||||||
_timeout_counter += 1;
|
_timeout_counter += 1;
|
||||||
|
@ -754,7 +809,6 @@ fn handle_event(
|
||||||
} else if new_md.len() <= old_md.len()
|
} else if new_md.len() <= old_md.len()
|
||||||
&& new_md.modified().unwrap() != old_md.modified().unwrap()
|
&& new_md.modified().unwrap() != old_md.modified().unwrap()
|
||||||
{
|
{
|
||||||
// TODO: [2021-10; jhscheer] add test for this
|
|
||||||
show_error!("{}: file truncated", display_name.display());
|
show_error!("{}: file truncated", display_name.display());
|
||||||
files.update_metadata(event_path, None);
|
files.update_metadata(event_path, None);
|
||||||
files.reopen_file(event_path).unwrap();
|
files.reopen_file(event_path).unwrap();
|
||||||
|
|
|
@ -778,8 +778,8 @@ fn test_retry4() {
|
||||||
tail: 'missing' has appeared; following new file\n\
|
tail: 'missing' has appeared; following new file\n\
|
||||||
tail: missing: file truncated\n";
|
tail: missing: file truncated\n";
|
||||||
let expected_stdout = "X1\nX\n";
|
let expected_stdout = "X1\nX\n";
|
||||||
let delay = 1000;
|
let delay = 100;
|
||||||
let mut args = vec!["--follow=descriptor", "--retry", missing, "--use-polling"];
|
let mut args = vec!["-s.1", "--max-unchanged-stats=1", "--follow=descriptor", "--retry", missing, "---disable-inotify"];
|
||||||
for _ in 0..2 {
|
for _ in 0..2 {
|
||||||
let mut p = ts.ucmd().args(&args).run_no_wait();
|
let mut p = ts.ucmd().args(&args).run_no_wait();
|
||||||
|
|
||||||
|
@ -787,9 +787,9 @@ fn test_retry4() {
|
||||||
at.touch(missing);
|
at.touch(missing);
|
||||||
sleep(Duration::from_millis(delay));
|
sleep(Duration::from_millis(delay));
|
||||||
at.truncate(missing, "X1\n");
|
at.truncate(missing, "X1\n");
|
||||||
sleep(Duration::from_millis(3 * delay));
|
sleep(Duration::from_millis(delay));
|
||||||
at.truncate(missing, "X\n");
|
at.truncate(missing, "X\n");
|
||||||
sleep(Duration::from_millis(3 * delay));
|
sleep(Duration::from_millis(delay));
|
||||||
|
|
||||||
p.kill().unwrap();
|
p.kill().unwrap();
|
||||||
|
|
||||||
|
@ -1089,7 +1089,7 @@ fn test_follow_descriptor_vs_rename1() {
|
||||||
"-s.1",
|
"-s.1",
|
||||||
"--max-unchanged-stats=1",
|
"--max-unchanged-stats=1",
|
||||||
file_a,
|
file_a,
|
||||||
"--disable-inotify",
|
"---disable-inotify",
|
||||||
];
|
];
|
||||||
|
|
||||||
let delay = 500;
|
let delay = 500;
|
||||||
|
@ -1143,7 +1143,7 @@ fn test_follow_descriptor_vs_rename2() {
|
||||||
file_a,
|
file_a,
|
||||||
file_b,
|
file_b,
|
||||||
"--verbose",
|
"--verbose",
|
||||||
"--disable-inotify",
|
"---disable-inotify",
|
||||||
];
|
];
|
||||||
|
|
||||||
let delay = 100;
|
let delay = 100;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue