diff --git a/src/uu/split/src/platform/unix.rs b/src/uu/split/src/platform/unix.rs index c2bf7216b..1fd990e0a 100644 --- a/src/uu/split/src/platform/unix.rs +++ b/src/uu/split/src/platform/unix.rs @@ -117,22 +117,37 @@ impl Drop for FilterWriter { pub fn instantiate_current_writer( filter: &Option, filename: &str, + is_new: bool, ) -> Result>> { match filter { - None => Ok(BufWriter::new(Box::new( - // write to the next file - std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(std::path::Path::new(&filename)) - .map_err(|_| { - Error::new( - ErrorKind::Other, - format!("unable to open '{filename}'; aborting"), - ) - })?, - ) as Box)), + None => { + let file = if is_new { + // create new file + std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(std::path::Path::new(&filename)) + .map_err(|_| { + Error::new( + ErrorKind::Other, + format!("unable to open '{filename}'; aborting"), + ) + })? + } else { + // re-open file that we previously created to append to it + std::fs::OpenOptions::new() + .append(true) + .open(std::path::Path::new(&filename)) + .map_err(|_| { + Error::new( + ErrorKind::Other, + format!("unable to re-open '{filename}'; aborting"), + ) + })? + }; + Ok(BufWriter::new(Box::new(file) as Box)) + } Some(ref filter_command) => Ok(BufWriter::new(Box::new( // spawn a shell command and write to it FilterWriter::new(filter_command, filename)?, diff --git a/src/uu/split/src/platform/windows.rs b/src/uu/split/src/platform/windows.rs index 8b9078989..a531d6abc 100644 --- a/src/uu/split/src/platform/windows.rs +++ b/src/uu/split/src/platform/windows.rs @@ -14,9 +14,10 @@ use uucore::fs; pub fn instantiate_current_writer( _filter: &Option, filename: &str, + is_new: bool, ) -> Result>> { - Ok(BufWriter::new(Box::new( - // write to the next file + let file = if is_new { + // create new file std::fs::OpenOptions::new() .write(true) .create(true) @@ -25,10 +26,22 @@ pub fn instantiate_current_writer( .map_err(|_| { Error::new( ErrorKind::Other, - format!("'{filename}' would overwrite input; aborting"), + format!("unable to open '{filename}'; aborting"), ) - })?, - ) as Box)) + })? + } else { + // re-open file that we previously created to append to it + std::fs::OpenOptions::new() + .append(true) + .open(std::path::Path::new(&filename)) + .map_err(|_| { + Error::new( + ErrorKind::Other, + format!("unable to re-open '{filename}'; aborting"), + ) + })? + }; + Ok(BufWriter::new(Box::new(file) as Box)) } pub fn paths_refer_to_same_file(p1: &str, p2: &str) -> bool { diff --git a/src/uu/split/src/split.rs b/src/uu/split/src/split.rs index 4e2af0be4..a837bcb21 100644 --- a/src/uu/split/src/split.rs +++ b/src/uu/split/src/split.rs @@ -3,7 +3,7 @@ // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore nbbbb ncccc hexdigit +// spell-checker:ignore nbbbb ncccc hexdigit getmaxstdio mod filenames; mod number; @@ -563,7 +563,11 @@ impl Settings { Ok(result) } - fn instantiate_current_writer(&self, filename: &str) -> io::Result>> { + fn instantiate_current_writer( + &self, + filename: &str, + is_new: bool, + ) -> io::Result>> { if platform::paths_refer_to_same_file(&self.input, filename) { return Err(io::Error::new( ErrorKind::Other, @@ -571,7 +575,7 @@ impl Settings { )); } - platform::instantiate_current_writer(&self.filter, filename) + platform::instantiate_current_writer(&self.filter, filename, is_new) } } @@ -618,7 +622,7 @@ fn custom_write_all( /// Get the size of the input file in bytes /// Used only for subset of `--number=CHUNKS` strategy, as there is a need -/// to determine input file size upfront in order to know chunk size +/// to determine input file size upfront in order to estimate the chunk size /// to be written into each of N files/chunks: /// * N split into N files based on size of input /// * K/N output Kth of N to stdout @@ -748,7 +752,7 @@ impl<'a> ByteChunkWriter<'a> { if settings.verbose { println!("creating file {}", filename.quote()); } - let inner = settings.instantiate_current_writer(&filename)?; + let inner = settings.instantiate_current_writer(&filename, true)?; Ok(ByteChunkWriter { settings, chunk_size, @@ -786,7 +790,7 @@ impl<'a> Write for ByteChunkWriter<'a> { if self.settings.verbose { println!("creating file {}", filename.quote()); } - self.inner = self.settings.instantiate_current_writer(&filename)?; + self.inner = self.settings.instantiate_current_writer(&filename, true)?; } // If the capacity of this chunk is greater than the number of @@ -872,7 +876,7 @@ impl<'a> LineChunkWriter<'a> { if settings.verbose { println!("creating file {}", filename.quote()); } - let inner = settings.instantiate_current_writer(&filename)?; + let inner = settings.instantiate_current_writer(&filename, true)?; Ok(LineChunkWriter { settings, chunk_size, @@ -907,7 +911,7 @@ impl<'a> Write for LineChunkWriter<'a> { if self.settings.verbose { println!("creating file {}", filename.quote()); } - self.inner = self.settings.instantiate_current_writer(&filename)?; + self.inner = self.settings.instantiate_current_writer(&filename, true)?; self.num_lines_remaining_in_current_chunk = self.chunk_size; } @@ -979,7 +983,7 @@ impl<'a> LineBytesChunkWriter<'a> { if settings.verbose { println!("creating file {}", filename.quote()); } - let inner = settings.instantiate_current_writer(&filename)?; + let inner = settings.instantiate_current_writer(&filename, true)?; Ok(LineBytesChunkWriter { settings, chunk_size, @@ -1045,7 +1049,7 @@ impl<'a> Write for LineBytesChunkWriter<'a> { if self.settings.verbose { println!("creating file {}", filename.quote()); } - self.inner = self.settings.instantiate_current_writer(&filename)?; + self.inner = self.settings.instantiate_current_writer(&filename, true)?; self.num_bytes_remaining_in_current_chunk = self.chunk_size.try_into().unwrap(); } @@ -1134,57 +1138,120 @@ impl<'a> Write for LineBytesChunkWriter<'a> { struct OutFile { filename: String, maybe_writer: Option>>, + is_new: bool, } -impl OutFile { - /// Get the writer for the output file - /// Instantiate the writer if it has not been instantiated upfront - fn get_writer(&mut self, settings: &Settings) -> UResult<&mut BufWriter>> { - if self.maybe_writer.is_some() { - Ok(self.maybe_writer.as_mut().unwrap()) +/// A set of output files +/// Used in [`n_chunks_by_byte`], [`n_chunks_by_line`] +/// and [`n_chunks_by_line_round_robin`] functions. +type OutFiles = Vec; +trait ManageOutFiles { + /// Initialize a new set of output files + /// Each OutFile is generated with filename, while the writer for it could be + /// optional, to be instantiated later by the calling function as needed. + /// Optional writers could happen in the following situations: + /// * in [`n_chunks_by_line`] and [`n_chunks_by_line_round_robin`] if `elide_empty_files` parameter is set to `true` + /// * if the number of files is greater than system limit for open files + fn init(num_files: u64, settings: &Settings, is_writer_optional: bool) -> UResult + where + Self: Sized; + /// Get the writer for the output file by index. + /// If system limit of open files has been reached + /// it will try to close one of previously instantiated writers + /// to free up resources and re-try instantiating current writer, + /// except for `--filter` mode. + /// The writers that get closed to free up resources for the current writer + /// are flagged as `is_new=false`, so they can be re-opened for appending + /// instead of created anew if we need to keep writing into them later, + /// i.e. in case of round robin distribution as in [`n_chunks_by_line_round_robin`] + fn get_writer( + &mut self, + idx: usize, + settings: &Settings, + ) -> UResult<&mut BufWriter>>; +} + +impl ManageOutFiles for OutFiles { + fn init(num_files: u64, settings: &Settings, is_writer_optional: bool) -> UResult { + // This object is responsible for creating the filename for each chunk + let mut filename_iterator: FilenameIterator<'_> = + FilenameIterator::new(&settings.prefix, &settings.suffix) + .map_err(|e| io::Error::new(ErrorKind::Other, format!("{e}")))?; + let mut out_files: Self = Self::new(); + for _ in 0..num_files { + let filename = filename_iterator + .next() + .ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?; + let maybe_writer = if is_writer_optional { + None + } else { + let instantiated = settings.instantiate_current_writer(filename.as_str(), true); + // If there was an error instantiating the writer for a file, + // it could be due to hitting the system limit of open files, + // so record it as None and let [`get_writer`] function handle closing/re-opening + // of writers as needed within system limits. + // However, for `--filter` child process writers - propagate the error, + // as working around system limits of open files for child shell processes + // is currently not supported (same as in GNU) + match instantiated { + Ok(writer) => Some(writer), + Err(e) if settings.filter.is_some() => { + return Err(e.into()); + } + Err(_) => None, + } + }; + out_files.push(OutFile { + filename, + maybe_writer, + is_new: true, + }); + } + Ok(out_files) + } + + fn get_writer( + &mut self, + idx: usize, + settings: &Settings, + ) -> UResult<&mut BufWriter>> { + if self[idx].maybe_writer.is_some() { + Ok(self[idx].maybe_writer.as_mut().unwrap()) } else { - // Writer was not instantiated upfront - // Instantiate it and record for future use - self.maybe_writer = Some(settings.instantiate_current_writer(self.filename.as_str())?); - Ok(self.maybe_writer.as_mut().unwrap()) + // Writer was not instantiated upfront or was temporarily closed due to system resources constraints. + // Instantiate it and record for future use. + let maybe_writer = + settings.instantiate_current_writer(self[idx].filename.as_str(), self[idx].is_new); + if let Ok(writer) = maybe_writer { + self[idx].maybe_writer = Some(writer); + Ok(self[idx].maybe_writer.as_mut().unwrap()) + } else if settings.filter.is_some() { + // Propagate error if in `--filter` mode + Err(maybe_writer.err().unwrap().into()) + } else { + // Could have hit system limit for open files. + // Try to close one previously instantiated writer first + for (i, out_file) in self.iter_mut().enumerate() { + if i != idx && out_file.maybe_writer.is_some() { + out_file.maybe_writer.as_mut().unwrap().flush()?; + out_file.maybe_writer = None; + out_file.is_new = false; + break; + } + } + // And then try to instantiate the writer again + // If this fails - give up and propagate the error + self[idx].maybe_writer = + Some(settings.instantiate_current_writer( + self[idx].filename.as_str(), + self[idx].is_new, + )?); + Ok(self[idx].maybe_writer.as_mut().unwrap()) + } } } } -/// Generate a set of Output Files -/// This is a helper function to [`n_chunks_by_byte`], [`n_chunks_by_line`] -/// and [`n_chunks_by_line_round_robin`]. -/// Each OutFile is generated with filename, while the writer for it could be -/// optional, to be instantiated later by the calling function as needed. -/// Optional writers could happen in [`n_chunks_by_line`] -/// if `elide_empty_files` parameter is set to `true`. -fn get_out_files( - num_files: u64, - settings: &Settings, - is_writer_optional: bool, -) -> UResult> { - // This object is responsible for creating the filename for each chunk - let mut filename_iterator: FilenameIterator<'_> = - FilenameIterator::new(&settings.prefix, &settings.suffix) - .map_err(|e| io::Error::new(ErrorKind::Other, format!("{e}")))?; - let mut out_files: Vec = Vec::new(); - for _ in 0..num_files { - let filename = filename_iterator - .next() - .ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?; - let maybe_writer = if is_writer_optional { - None - } else { - Some(settings.instantiate_current_writer(filename.as_str())?) - }; - out_files.push(OutFile { - filename, - maybe_writer, - }); - } - Ok(out_files) -} - /// Split a file or STDIN into a specific number of chunks by byte. /// /// When file size cannot be evenly divided into the number of chunks of the same size, @@ -1261,7 +1328,7 @@ where // In Kth chunk of N mode - we will write to stdout instead of to a file. let mut stdout_writer = std::io::stdout().lock(); // In N chunks mode - we will write to `num_chunks` files - let mut out_files: Vec = Vec::new(); + let mut out_files: OutFiles = OutFiles::new(); // Calculate chunk size base and modulo reminder // to be used in calculating chunk_size later on @@ -1273,7 +1340,7 @@ where // This will create each of the underlying files // or stdin pipes to child shell/command processes if in `--filter` mode if kth_chunk.is_none() { - out_files = get_out_files(num_chunks, settings, false)?; + out_files = OutFiles::init(num_chunks, settings, false)?; } for i in 1_u64..=num_chunks { @@ -1317,7 +1384,7 @@ where } None => { let idx = (i - 1) as usize; - let writer = out_files[idx].get_writer(settings)?; + let writer = out_files.get_writer(idx, settings)?; writer.write_all(buf)?; } } @@ -1387,7 +1454,7 @@ where // In Kth chunk of N mode - we will write to stdout instead of to a file. let mut stdout_writer = std::io::stdout().lock(); // In N chunks mode - we will write to `num_chunks` files - let mut out_files: Vec = Vec::new(); + let mut out_files: OutFiles = OutFiles::new(); // Calculate chunk size base and modulo reminder // to be used in calculating `num_bytes_should_be_written` later on @@ -1402,7 +1469,7 @@ where // Otherwise keep writer optional, to be instantiated later if there is data // to write for the associated chunk. if kth_chunk.is_none() { - out_files = get_out_files(num_chunks, settings, settings.elide_empty_files)?; + out_files = OutFiles::init(num_chunks, settings, settings.elide_empty_files)?; } let mut chunk_number = 1; @@ -1429,7 +1496,7 @@ where None => { // Should write into a file let idx = (chunk_number - 1) as usize; - let writer = out_files[idx].get_writer(settings)?; + let writer = out_files.get_writer(idx, settings)?; custom_write_all(bytes, writer, settings)?; } } @@ -1467,7 +1534,11 @@ where } /// Split a file or STDIN into a specific number of chunks by line, but -/// assign lines via round-robin +/// assign lines via round-robin. +/// Note: There is no need to know the size of the input upfront for this method, +/// since the lines are assigned to chunks randomly and the size of each chunk +/// does not need to be estimated. As a result, "infinite" inputs are supported +/// for this method, i.e. `yes | split -n r/10` or `yes | split -n r/3/11` /// /// In Kth chunk of N mode - writes to stdout the contents of the chunk identified by `kth_chunk` /// @@ -1503,48 +1574,51 @@ where // In Kth chunk of N mode - we will write to stdout instead of to a file. let mut stdout_writer = std::io::stdout().lock(); // In N chunks mode - we will write to `num_chunks` files - let mut out_files: Vec = Vec::new(); + let mut out_files: OutFiles = OutFiles::new(); // If in N chunks mode // Create one writer for each chunk. // This will create each of the underlying files // or stdin pipes to child shell/command processes if in `--filter` mode if kth_chunk.is_none() { - out_files = get_out_files(num_chunks, settings, false)?; + out_files = OutFiles::init(num_chunks, settings, settings.elide_empty_files)?; } let num_chunks: usize = num_chunks.try_into().unwrap(); let sep = settings.separator; let mut closed_writers = 0; - for (i, line_result) in reader.split(sep).enumerate() { - // add separator back in at the end of the line - let mut line = line_result?; - line.push(sep); - let bytes = line.as_slice(); + let mut i = 0; + loop { + let line = &mut Vec::new(); + let num_bytes_read = reader.by_ref().read_until(sep, line)?; + + // if there is nothing else to read - exit the loop + if num_bytes_read == 0 { + break; + }; + + let bytes = line.as_slice(); match kth_chunk { Some(chunk_number) => { - // The `.enumerate()` method returns index `i` starting with 0, - // but chunk number is given as a 1-indexed number, - // so compare to `chunk_number - 1` if (i % num_chunks) == (chunk_number - 1) as usize { stdout_writer.write_all(bytes)?; } } None => { - let writer = out_files[i % num_chunks].get_writer(settings)?; + let writer = out_files.get_writer(i % num_chunks, settings)?; let writer_stdin_open = custom_write_all(bytes, writer, settings)?; if !writer_stdin_open { closed_writers += 1; - if closed_writers == num_chunks { - // all writers are closed - stop reading - break; - } } } } + i += 1; + if closed_writers == num_chunks { + // all writers are closed - stop reading + break; + } } - Ok(()) } diff --git a/tests/by-util/test_split.rs b/tests/by-util/test_split.rs index 2c9a56bdd..acb8ab561 100644 --- a/tests/by-util/test_split.rs +++ b/tests/by-util/test_split.rs @@ -2,11 +2,13 @@ // // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore xzaaa sixhundredfiftyonebytes ninetyonebytes threebytes asciilowercase ghijkl mnopq rstuv wxyz fivelines twohundredfortyonebytes onehundredlines nbbbb dxen ncccc +// spell-checker:ignore xzaaa sixhundredfiftyonebytes ninetyonebytes threebytes asciilowercase ghijkl mnopq rstuv wxyz fivelines twohundredfortyonebytes onehundredlines nbbbb dxen ncccc rlimit NOFILE use crate::common::util::{AtPath, TestScenario}; use rand::{thread_rng, Rng, SeedableRng}; use regex::Regex; +#[cfg(any(target_os = "linux", target_os = "android"))] +use rlimit::Resource; #[cfg(not(windows))] use std::env; use std::path::Path; @@ -1250,10 +1252,19 @@ fn test_number_by_lines_kth_no_end_sep() { .succeeds() .stdout_only("2222\n"); new_ucmd!() - .args(&["-e", "-n", "l/8/10"]) + .args(&["-e", "-n", "l/2/2"]) .pipe_in("1\n2222\n3\n4") .succeeds() - .stdout_only("3\n"); + .stdout_only("3\n4"); +} + +#[test] +fn test_number_by_lines_rr_kth_no_end_sep() { + new_ucmd!() + .args(&["-n", "r/2/3"]) + .pipe_in("1\n2\n3\n4\n5") + .succeeds() + .stdout_only("2\n5"); } #[test] @@ -1626,6 +1637,15 @@ fn test_round_robin() { assert_eq!(at.read("xab"), "2\n4\n"); } +#[test] +#[cfg(any(target_os = "linux", target_os = "android"))] +fn test_round_robin_limited_file_descriptors() { + new_ucmd!() + .args(&["-n", "r/40", "onehundredlines.txt"]) + .limit(Resource::NOFILE, 9, 9) + .succeeds(); +} + #[test] fn test_split_invalid_input() { // Test if stdout/stderr for '--lines' option is correct