1
Fork 0
mirror of https://github.com/RGBCube/uutils-coreutils synced 2025-07-29 12:07:46 +00:00

split: handling system limit on open files

This commit is contained in:
zhitkoff 2023-11-24 17:25:16 -05:00 committed by Yury Zhytkou
parent 84b5e6f0a1
commit dc92a434ef
3 changed files with 193 additions and 81 deletions

View file

@ -117,10 +117,12 @@ impl Drop for FilterWriter {
pub fn instantiate_current_writer( pub fn instantiate_current_writer(
filter: &Option<String>, filter: &Option<String>,
filename: &str, filename: &str,
is_new: bool,
) -> Result<BufWriter<Box<dyn Write>>> { ) -> Result<BufWriter<Box<dyn Write>>> {
match filter { match filter {
None => Ok(BufWriter::new(Box::new( None => {
// write to the next file let file = if is_new {
// create new file
std::fs::OpenOptions::new() std::fs::OpenOptions::new()
.write(true) .write(true)
.create(true) .create(true)
@ -131,8 +133,21 @@ pub fn instantiate_current_writer(
ErrorKind::Other, ErrorKind::Other,
format!("unable to open '{filename}'; aborting"), format!("unable to open '{filename}'; aborting"),
) )
})?, })?
) as Box<dyn Write>)), } else {
// re-open file that we previously created to append to it
std::fs::OpenOptions::new()
.append(true)
.open(std::path::Path::new(&filename))
.map_err(|_| {
Error::new(
ErrorKind::Other,
format!("unable to re-open '{filename}'; aborting"),
)
})?
};
Ok(BufWriter::new(Box::new(file) as Box<dyn Write>))
}
Some(ref filter_command) => Ok(BufWriter::new(Box::new( Some(ref filter_command) => Ok(BufWriter::new(Box::new(
// spawn a shell command and write to it // spawn a shell command and write to it
FilterWriter::new(filter_command, filename)?, FilterWriter::new(filter_command, filename)?,

View file

@ -14,9 +14,10 @@ use uucore::fs;
pub fn instantiate_current_writer( pub fn instantiate_current_writer(
_filter: &Option<String>, _filter: &Option<String>,
filename: &str, filename: &str,
is_new: bool,
) -> Result<BufWriter<Box<dyn Write>>> { ) -> Result<BufWriter<Box<dyn Write>>> {
Ok(BufWriter::new(Box::new( let file = if is_new {
// write to the next file // create new file
std::fs::OpenOptions::new() std::fs::OpenOptions::new()
.write(true) .write(true)
.create(true) .create(true)
@ -25,10 +26,22 @@ pub fn instantiate_current_writer(
.map_err(|_| { .map_err(|_| {
Error::new( Error::new(
ErrorKind::Other, ErrorKind::Other,
format!("'{filename}' would overwrite input; aborting"), format!("unable to open '{filename}'; aborting"),
) )
})?, })?
) as Box<dyn Write>)) } else {
// re-open file that we previously created to append to it
std::fs::OpenOptions::new()
.append(true)
.open(std::path::Path::new(&filename))
.map_err(|_| {
Error::new(
ErrorKind::Other,
format!("unable to re-open '{filename}'; aborting"),
)
})?
};
Ok(BufWriter::new(Box::new(file) as Box<dyn Write>))
} }
pub fn paths_refer_to_same_file(p1: &str, p2: &str) -> bool { pub fn paths_refer_to_same_file(p1: &str, p2: &str) -> bool {

View file

@ -3,7 +3,7 @@
// For the full copyright and license information, please view the LICENSE // For the full copyright and license information, please view the LICENSE
// file that was distributed with this source code. // file that was distributed with this source code.
// spell-checker:ignore nbbbb ncccc hexdigit // spell-checker:ignore nbbbb ncccc hexdigit getmaxstdio
mod filenames; mod filenames;
mod number; mod number;
@ -563,7 +563,11 @@ impl Settings {
Ok(result) Ok(result)
} }
fn instantiate_current_writer(&self, filename: &str) -> io::Result<BufWriter<Box<dyn Write>>> { fn instantiate_current_writer(
&self,
filename: &str,
is_new: bool,
) -> io::Result<BufWriter<Box<dyn Write>>> {
if platform::paths_refer_to_same_file(&self.input, filename) { if platform::paths_refer_to_same_file(&self.input, filename) {
return Err(io::Error::new( return Err(io::Error::new(
ErrorKind::Other, ErrorKind::Other,
@ -571,7 +575,7 @@ impl Settings {
)); ));
} }
platform::instantiate_current_writer(&self.filter, filename) platform::instantiate_current_writer(&self.filter, filename, is_new)
} }
} }
@ -748,7 +752,7 @@ impl<'a> ByteChunkWriter<'a> {
if settings.verbose { if settings.verbose {
println!("creating file {}", filename.quote()); println!("creating file {}", filename.quote());
} }
let inner = settings.instantiate_current_writer(&filename)?; let inner = settings.instantiate_current_writer(&filename, true)?;
Ok(ByteChunkWriter { Ok(ByteChunkWriter {
settings, settings,
chunk_size, chunk_size,
@ -786,7 +790,7 @@ impl<'a> Write for ByteChunkWriter<'a> {
if self.settings.verbose { if self.settings.verbose {
println!("creating file {}", filename.quote()); println!("creating file {}", filename.quote());
} }
self.inner = self.settings.instantiate_current_writer(&filename)?; self.inner = self.settings.instantiate_current_writer(&filename, true)?;
} }
// If the capacity of this chunk is greater than the number of // If the capacity of this chunk is greater than the number of
@ -872,7 +876,7 @@ impl<'a> LineChunkWriter<'a> {
if settings.verbose { if settings.verbose {
println!("creating file {}", filename.quote()); println!("creating file {}", filename.quote());
} }
let inner = settings.instantiate_current_writer(&filename)?; let inner = settings.instantiate_current_writer(&filename, true)?;
Ok(LineChunkWriter { Ok(LineChunkWriter {
settings, settings,
chunk_size, chunk_size,
@ -907,7 +911,7 @@ impl<'a> Write for LineChunkWriter<'a> {
if self.settings.verbose { if self.settings.verbose {
println!("creating file {}", filename.quote()); println!("creating file {}", filename.quote());
} }
self.inner = self.settings.instantiate_current_writer(&filename)?; self.inner = self.settings.instantiate_current_writer(&filename, true)?;
self.num_lines_remaining_in_current_chunk = self.chunk_size; self.num_lines_remaining_in_current_chunk = self.chunk_size;
} }
@ -979,7 +983,7 @@ impl<'a> LineBytesChunkWriter<'a> {
if settings.verbose { if settings.verbose {
println!("creating file {}", filename.quote()); println!("creating file {}", filename.quote());
} }
let inner = settings.instantiate_current_writer(&filename)?; let inner = settings.instantiate_current_writer(&filename, true)?;
Ok(LineBytesChunkWriter { Ok(LineBytesChunkWriter {
settings, settings,
chunk_size, chunk_size,
@ -1045,7 +1049,7 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
if self.settings.verbose { if self.settings.verbose {
println!("creating file {}", filename.quote()); println!("creating file {}", filename.quote());
} }
self.inner = self.settings.instantiate_current_writer(&filename)?; self.inner = self.settings.instantiate_current_writer(&filename, true)?;
self.num_bytes_remaining_in_current_chunk = self.chunk_size.try_into().unwrap(); self.num_bytes_remaining_in_current_chunk = self.chunk_size.try_into().unwrap();
} }
@ -1134,40 +1138,63 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
struct OutFile { struct OutFile {
filename: String, filename: String,
maybe_writer: Option<BufWriter<Box<dyn Write>>>, maybe_writer: Option<BufWriter<Box<dyn Write>>>,
is_new: bool,
} }
impl OutFile { // impl OutFile {
/// Get the writer for the output file // /// Get the writer for the output file.
/// Instantiate the writer if it has not been instantiated upfront // /// Instantiate the writer if it has not been instantiated upfront
fn get_writer(&mut self, settings: &Settings) -> UResult<&mut BufWriter<Box<dyn Write>>> { // /// or temporarily closed to free up system resources
if self.maybe_writer.is_some() { // fn get_writer(&mut self, settings: &Settings) -> UResult<&mut BufWriter<Box<dyn Write>>> {
Ok(self.maybe_writer.as_mut().unwrap()) // if self.maybe_writer.is_some() {
} else { // Ok(self.maybe_writer.as_mut().unwrap())
// Writer was not instantiated upfront // } else {
// Instantiate it and record for future use // // Writer was not instantiated upfront or was temporarily closed due to system resources constraints.
self.maybe_writer = Some(settings.instantiate_current_writer(self.filename.as_str())?); // // Instantiate it and record for future use.
Ok(self.maybe_writer.as_mut().unwrap()) // self.maybe_writer =
} // Some(settings.instantiate_current_writer(self.filename.as_str(), self.is_new)?);
} // Ok(self.maybe_writer.as_mut().unwrap())
} // }
// }
// }
/// Generate a set of Output Files /// A set of output files
/// This is a helper function to [`n_chunks_by_byte`], [`n_chunks_by_line`] /// Used in [`n_chunks_by_byte`], [`n_chunks_by_line`]
/// and [`n_chunks_by_line_round_robin`]. /// and [`n_chunks_by_line_round_robin`] functions.
type OutFiles = Vec<OutFile>;
trait ManageOutFiles {
/// Initialize a new set of output files
/// Each OutFile is generated with filename, while the writer for it could be /// Each OutFile is generated with filename, while the writer for it could be
/// optional, to be instantiated later by the calling function as needed. /// optional, to be instantiated later by the calling function as needed.
/// Optional writers could happen in [`n_chunks_by_line`] /// Optional writers could happen in the following situations:
/// if `elide_empty_files` parameter is set to `true`. /// * in [`n_chunks_by_line`] if `elide_empty_files` parameter is set to `true`
fn get_out_files( /// * if the number of files is greater than system limit for open files
num_files: u64, fn init(num_files: u64, settings: &Settings, is_writer_optional: bool) -> UResult<Self>
where
Self: Sized;
/// Get the writer for the output file by index.
/// If system limit of open files has been reached
/// it will try to close one of previously instantiated writers
/// to free up resources and re-try instantiating current writer,
/// except for `--filter` mode.
/// The writers that get closed to free up resources for the current writer
/// are flagged as `is_new=false`, so they can be re-opened for appending
/// instead of created anew if we need to keep writing into them later,
/// i.e. in case of round robin distribution as in [`n_chunks_by_line_round_robin`]
fn get_writer(
&mut self,
idx: usize,
settings: &Settings, settings: &Settings,
is_writer_optional: bool, ) -> UResult<&mut BufWriter<Box<dyn Write>>>;
) -> UResult<Vec<OutFile>> { }
impl ManageOutFiles for OutFiles {
fn init(num_files: u64, settings: &Settings, is_writer_optional: bool) -> UResult<Self> {
// This object is responsible for creating the filename for each chunk // This object is responsible for creating the filename for each chunk
let mut filename_iterator: FilenameIterator<'_> = let mut filename_iterator: FilenameIterator<'_> =
FilenameIterator::new(&settings.prefix, &settings.suffix) FilenameIterator::new(&settings.prefix, &settings.suffix)
.map_err(|e| io::Error::new(ErrorKind::Other, format!("{e}")))?; .map_err(|e| io::Error::new(ErrorKind::Other, format!("{e}")))?;
let mut out_files: Vec<OutFile> = Vec::new(); let mut out_files: Self = Self::new();
for _ in 0..num_files { for _ in 0..num_files {
let filename = filename_iterator let filename = filename_iterator
.next() .next()
@ -1175,16 +1202,73 @@ fn get_out_files(
let maybe_writer = if is_writer_optional { let maybe_writer = if is_writer_optional {
None None
} else { } else {
Some(settings.instantiate_current_writer(filename.as_str())?) let instantiated = settings.instantiate_current_writer(filename.as_str(), true);
// If there was an error instantiating the writer for a file,
// it could be due to hitting the system limit of open files,
// so record it as None and let [`get_writer`] function handle closing/re-opening
// of writers as needed within system limits.
// However, for `--filter` child process writers - propagate the error,
// as working around system limits of open files for child shell processes
// is currently not supported (same as in GNU)
match instantiated {
Ok(writer) => Some(writer),
Err(e) if settings.filter.is_some() => {
return Err(e.into());
}
Err(_) => None,
}
}; };
out_files.push(OutFile { out_files.push(OutFile {
filename, filename,
maybe_writer, maybe_writer,
is_new: true,
}); });
} }
Ok(out_files) Ok(out_files)
} }
fn get_writer(
&mut self,
idx: usize,
settings: &Settings,
) -> UResult<&mut BufWriter<Box<dyn Write>>> {
if self[idx].maybe_writer.is_some() {
Ok(self[idx].maybe_writer.as_mut().unwrap())
} else {
// Writer was not instantiated upfront or was temporarily closed due to system resources constraints.
// Instantiate it and record for future use.
let maybe_writer =
settings.instantiate_current_writer(self[idx].filename.as_str(), self[idx].is_new);
if let Ok(writer) = maybe_writer {
self[idx].maybe_writer = Some(writer);
Ok(self[idx].maybe_writer.as_mut().unwrap())
} else if settings.filter.is_some() {
// Propagate error if in `--filter` mode
Err(maybe_writer.err().unwrap().into())
} else {
// Could have hit system limit for open files.
// Try to close one previously instantiated writer first
for (i, out_file) in self.iter_mut().enumerate() {
if i != idx && out_file.maybe_writer.is_some() {
out_file.maybe_writer.as_mut().unwrap().flush()?;
out_file.maybe_writer = None;
out_file.is_new = false;
break;
}
}
// And then try to instantiate the writer again
// If this fails - give up and propagate the error
self[idx].maybe_writer =
Some(settings.instantiate_current_writer(
self[idx].filename.as_str(),
self[idx].is_new,
)?);
Ok(self[idx].maybe_writer.as_mut().unwrap())
}
}
}
}
/// Split a file or STDIN into a specific number of chunks by byte. /// Split a file or STDIN into a specific number of chunks by byte.
/// ///
/// When file size cannot be evenly divided into the number of chunks of the same size, /// When file size cannot be evenly divided into the number of chunks of the same size,
@ -1261,7 +1345,7 @@ where
// In Kth chunk of N mode - we will write to stdout instead of to a file. // In Kth chunk of N mode - we will write to stdout instead of to a file.
let mut stdout_writer = std::io::stdout().lock(); let mut stdout_writer = std::io::stdout().lock();
// In N chunks mode - we will write to `num_chunks` files // In N chunks mode - we will write to `num_chunks` files
let mut out_files: Vec<OutFile> = Vec::new(); let mut out_files: OutFiles = OutFiles::new();
// Calculate chunk size base and modulo reminder // Calculate chunk size base and modulo reminder
// to be used in calculating chunk_size later on // to be used in calculating chunk_size later on
@ -1273,7 +1357,7 @@ where
// This will create each of the underlying files // This will create each of the underlying files
// or stdin pipes to child shell/command processes if in `--filter` mode // or stdin pipes to child shell/command processes if in `--filter` mode
if kth_chunk.is_none() { if kth_chunk.is_none() {
out_files = get_out_files(num_chunks, settings, false)?; out_files = OutFiles::init(num_chunks, settings, false)?;
} }
for i in 1_u64..=num_chunks { for i in 1_u64..=num_chunks {
@ -1317,7 +1401,7 @@ where
} }
None => { None => {
let idx = (i - 1) as usize; let idx = (i - 1) as usize;
let writer = out_files[idx].get_writer(settings)?; let writer = out_files.get_writer(idx, settings)?;
writer.write_all(buf)?; writer.write_all(buf)?;
} }
} }
@ -1387,7 +1471,7 @@ where
// In Kth chunk of N mode - we will write to stdout instead of to a file. // In Kth chunk of N mode - we will write to stdout instead of to a file.
let mut stdout_writer = std::io::stdout().lock(); let mut stdout_writer = std::io::stdout().lock();
// In N chunks mode - we will write to `num_chunks` files // In N chunks mode - we will write to `num_chunks` files
let mut out_files: Vec<OutFile> = Vec::new(); let mut out_files: OutFiles = OutFiles::new();
// Calculate chunk size base and modulo reminder // Calculate chunk size base and modulo reminder
// to be used in calculating `num_bytes_should_be_written` later on // to be used in calculating `num_bytes_should_be_written` later on
@ -1402,7 +1486,7 @@ where
// Otherwise keep writer optional, to be instantiated later if there is data // Otherwise keep writer optional, to be instantiated later if there is data
// to write for the associated chunk. // to write for the associated chunk.
if kth_chunk.is_none() { if kth_chunk.is_none() {
out_files = get_out_files(num_chunks, settings, settings.elide_empty_files)?; out_files = OutFiles::init(num_chunks, settings, settings.elide_empty_files)?;
} }
let mut chunk_number = 1; let mut chunk_number = 1;
@ -1429,7 +1513,7 @@ where
None => { None => {
// Should write into a file // Should write into a file
let idx = (chunk_number - 1) as usize; let idx = (chunk_number - 1) as usize;
let writer = out_files[idx].get_writer(settings)?; let writer = out_files.get_writer(idx, settings)?;
custom_write_all(bytes, writer, settings)?; custom_write_all(bytes, writer, settings)?;
} }
} }
@ -1503,14 +1587,14 @@ where
// In Kth chunk of N mode - we will write to stdout instead of to a file. // In Kth chunk of N mode - we will write to stdout instead of to a file.
let mut stdout_writer = std::io::stdout().lock(); let mut stdout_writer = std::io::stdout().lock();
// In N chunks mode - we will write to `num_chunks` files // In N chunks mode - we will write to `num_chunks` files
let mut out_files: Vec<OutFile> = Vec::new(); let mut out_files: OutFiles = OutFiles::new();
// If in N chunks mode // If in N chunks mode
// Create one writer for each chunk. // Create one writer for each chunk.
// This will create each of the underlying files // This will create each of the underlying files
// or stdin pipes to child shell/command processes if in `--filter` mode // or stdin pipes to child shell/command processes if in `--filter` mode
if kth_chunk.is_none() { if kth_chunk.is_none() {
out_files = get_out_files(num_chunks, settings, false)?; out_files = OutFiles::init(num_chunks, settings, false)?;
} }
let num_chunks: usize = num_chunks.try_into().unwrap(); let num_chunks: usize = num_chunks.try_into().unwrap();
@ -1532,7 +1616,7 @@ where
} }
} }
None => { None => {
let writer = out_files[i % num_chunks].get_writer(settings)?; let writer = out_files.get_writer(i % num_chunks, settings)?;
let writer_stdin_open = custom_write_all(bytes, writer, settings)?; let writer_stdin_open = custom_write_all(bytes, writer, settings)?;
if !writer_stdin_open { if !writer_stdin_open {
closed_writers += 1; closed_writers += 1;