mirror of
https://github.com/RGBCube/uutils-coreutils
synced 2025-07-28 11:37:44 +00:00
split
: --filter
and stdin updates (#5418)
This commit is contained in:
parent
ad6d7e8a67
commit
eede467e21
2 changed files with 220 additions and 57 deletions
|
@ -748,6 +748,12 @@ enum SettingsError {
|
||||||
/// Multiple different separator characters
|
/// Multiple different separator characters
|
||||||
MultipleSeparatorCharacters,
|
MultipleSeparatorCharacters,
|
||||||
|
|
||||||
|
/// Using `--filter` with `--number` option sub-strategies that print Kth chunk out of N chunks to stdout
|
||||||
|
/// K/N
|
||||||
|
/// l/K/N
|
||||||
|
/// r/K/N
|
||||||
|
FilterWithKthChunkNumber,
|
||||||
|
|
||||||
/// The `--filter` option is not supported on Windows.
|
/// The `--filter` option is not supported on Windows.
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
NotSupported,
|
NotSupported,
|
||||||
|
@ -780,6 +786,9 @@ impl fmt::Display for SettingsError {
|
||||||
"invalid suffix {}, contains directory separator",
|
"invalid suffix {}, contains directory separator",
|
||||||
s.quote()
|
s.quote()
|
||||||
),
|
),
|
||||||
|
Self::FilterWithKthChunkNumber => {
|
||||||
|
write!(f, "--filter does not process a chunk extracted to stdout")
|
||||||
|
}
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
Self::NotSupported => write!(
|
Self::NotSupported => write!(
|
||||||
f,
|
f,
|
||||||
|
@ -850,12 +859,26 @@ impl Settings {
|
||||||
filter: matches.get_one::<String>(OPT_FILTER).map(|s| s.to_owned()),
|
filter: matches.get_one::<String>(OPT_FILTER).map(|s| s.to_owned()),
|
||||||
elide_empty_files: matches.get_flag(OPT_ELIDE_EMPTY_FILES),
|
elide_empty_files: matches.get_flag(OPT_ELIDE_EMPTY_FILES),
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
if result.filter.is_some() {
|
if result.filter.is_some() {
|
||||||
// see https://github.com/rust-lang/rust/issues/29494
|
// see https://github.com/rust-lang/rust/issues/29494
|
||||||
return Err(SettingsError::NotSupported);
|
return Err(SettingsError::NotSupported);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return an error if `--filter` option is used with any of the
|
||||||
|
// Kth chunk sub-strategies of `--number` option
|
||||||
|
// As those are writing to stdout of `split` and cannot write to filter command child process
|
||||||
|
let kth_chunk = matches!(
|
||||||
|
result.strategy,
|
||||||
|
Strategy::Number(NumberType::KthBytes(_, _))
|
||||||
|
| Strategy::Number(NumberType::KthLines(_, _))
|
||||||
|
| Strategy::Number(NumberType::KthRoundRobin(_, _))
|
||||||
|
);
|
||||||
|
if kth_chunk && result.filter.is_some() {
|
||||||
|
return Err(SettingsError::FilterWithKthChunkNumber);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -871,6 +894,47 @@ impl Settings {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// When using `--filter` option, writing to child command process stdin
|
||||||
|
/// could fail with BrokenPipe error
|
||||||
|
/// It can be safely ignored
|
||||||
|
fn ignorable_io_error(error: &std::io::Error, settings: &Settings) -> bool {
|
||||||
|
error.kind() == ErrorKind::BrokenPipe && settings.filter.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Custom wrapper for `write()` method
|
||||||
|
/// Follows similar approach to GNU implementation
|
||||||
|
/// If ignorable io error occurs, return number of bytes as if all bytes written
|
||||||
|
/// Should not be used for Kth chunk number sub-strategies
|
||||||
|
/// as those do not work with `--filter` option
|
||||||
|
fn custom_write<T: Write>(
|
||||||
|
bytes: &[u8],
|
||||||
|
writer: &mut T,
|
||||||
|
settings: &Settings,
|
||||||
|
) -> std::io::Result<usize> {
|
||||||
|
match writer.write(bytes) {
|
||||||
|
Ok(n) => Ok(n),
|
||||||
|
Err(e) if ignorable_io_error(&e, settings) => Ok(bytes.len()),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Custom wrapper for `write_all()` method
|
||||||
|
/// Similar to [`custom_write`], but returns true or false
|
||||||
|
/// depending on if `--filter` stdin is still open (no BrokenPipe error)
|
||||||
|
/// Should not be used for Kth chunk number sub-strategies
|
||||||
|
/// as those do not work with `--filter` option
|
||||||
|
fn custom_write_all<T: Write>(
|
||||||
|
bytes: &[u8],
|
||||||
|
writer: &mut T,
|
||||||
|
settings: &Settings,
|
||||||
|
) -> std::io::Result<bool> {
|
||||||
|
match writer.write_all(bytes) {
|
||||||
|
Ok(()) => Ok(true),
|
||||||
|
Err(e) if ignorable_io_error(&e, settings) => Ok(false),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Write a certain number of bytes to one file, then move on to another one.
|
/// Write a certain number of bytes to one file, then move on to another one.
|
||||||
///
|
///
|
||||||
/// This struct maintains an underlying writer representing the
|
/// This struct maintains an underlying writer representing the
|
||||||
|
@ -935,6 +999,7 @@ impl<'a> ByteChunkWriter<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Write for ByteChunkWriter<'a> {
|
impl<'a> Write for ByteChunkWriter<'a> {
|
||||||
|
/// Implements `--bytes=SIZE`
|
||||||
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
|
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
|
||||||
// If the length of `buf` exceeds the number of bytes remaining
|
// If the length of `buf` exceeds the number of bytes remaining
|
||||||
// in the current chunk, we will need to write to multiple
|
// in the current chunk, we will need to write to multiple
|
||||||
|
@ -966,9 +1031,9 @@ impl<'a> Write for ByteChunkWriter<'a> {
|
||||||
// bytes in `buf`, then write all the bytes in `buf`. Otherwise,
|
// bytes in `buf`, then write all the bytes in `buf`. Otherwise,
|
||||||
// write enough bytes to fill the current chunk, then increment
|
// write enough bytes to fill the current chunk, then increment
|
||||||
// the chunk number and repeat.
|
// the chunk number and repeat.
|
||||||
let n = buf.len();
|
let buf_len = buf.len();
|
||||||
if (n as u64) < self.num_bytes_remaining_in_current_chunk {
|
if (buf_len as u64) < self.num_bytes_remaining_in_current_chunk {
|
||||||
let num_bytes_written = self.inner.write(buf)?;
|
let num_bytes_written = custom_write(buf, &mut self.inner, self.settings)?;
|
||||||
self.num_bytes_remaining_in_current_chunk -= num_bytes_written as u64;
|
self.num_bytes_remaining_in_current_chunk -= num_bytes_written as u64;
|
||||||
return Ok(carryover_bytes_written + num_bytes_written);
|
return Ok(carryover_bytes_written + num_bytes_written);
|
||||||
} else {
|
} else {
|
||||||
|
@ -978,7 +1043,7 @@ impl<'a> Write for ByteChunkWriter<'a> {
|
||||||
// self.num_bytes_remaining_in_current_chunk is lower than
|
// self.num_bytes_remaining_in_current_chunk is lower than
|
||||||
// n, which is already usize.
|
// n, which is already usize.
|
||||||
let i = self.num_bytes_remaining_in_current_chunk as usize;
|
let i = self.num_bytes_remaining_in_current_chunk as usize;
|
||||||
let num_bytes_written = self.inner.write(&buf[..i])?;
|
let num_bytes_written = custom_write(&buf[..i], &mut self.inner, self.settings)?;
|
||||||
self.num_bytes_remaining_in_current_chunk -= num_bytes_written as u64;
|
self.num_bytes_remaining_in_current_chunk -= num_bytes_written as u64;
|
||||||
|
|
||||||
// It's possible that the underlying writer did not
|
// It's possible that the underlying writer did not
|
||||||
|
@ -1064,6 +1129,7 @@ impl<'a> LineChunkWriter<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Write for LineChunkWriter<'a> {
|
impl<'a> Write for LineChunkWriter<'a> {
|
||||||
|
/// Implements `--lines=NUMBER`
|
||||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
// If the number of lines in `buf` exceeds the number of lines
|
// If the number of lines in `buf` exceeds the number of lines
|
||||||
// remaining in the current chunk, we will need to write to
|
// remaining in the current chunk, we will need to write to
|
||||||
|
@ -1092,14 +1158,16 @@ impl<'a> Write for LineChunkWriter<'a> {
|
||||||
// Write the line, starting from *after* the previous
|
// Write the line, starting from *after* the previous
|
||||||
// separator character and ending *after* the current
|
// separator character and ending *after* the current
|
||||||
// separator character.
|
// separator character.
|
||||||
let n = self.inner.write(&buf[prev..i + 1])?;
|
let num_bytes_written =
|
||||||
total_bytes_written += n;
|
custom_write(&buf[prev..i + 1], &mut self.inner, self.settings)?;
|
||||||
|
total_bytes_written += num_bytes_written;
|
||||||
prev = i + 1;
|
prev = i + 1;
|
||||||
self.num_lines_remaining_in_current_chunk -= 1;
|
self.num_lines_remaining_in_current_chunk -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let n = self.inner.write(&buf[prev..buf.len()])?;
|
let num_bytes_written =
|
||||||
total_bytes_written += n;
|
custom_write(&buf[prev..buf.len()], &mut self.inner, self.settings)?;
|
||||||
|
total_bytes_written += num_bytes_written;
|
||||||
Ok(total_bytes_written)
|
Ok(total_bytes_written)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1196,6 +1264,8 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
|
||||||
/// |------| |-------| |--------| |---|
|
/// |------| |-------| |--------| |---|
|
||||||
/// aaaaaaaa a\nbbbb\n cccc\ndd\n ee\n
|
/// aaaaaaaa a\nbbbb\n cccc\ndd\n ee\n
|
||||||
/// ```
|
/// ```
|
||||||
|
///
|
||||||
|
/// Implements `--line-bytes=SIZE`
|
||||||
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
|
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
|
||||||
// The total number of bytes written during the loop below.
|
// The total number of bytes written during the loop below.
|
||||||
//
|
//
|
||||||
|
@ -1248,7 +1318,11 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
|
||||||
{
|
{
|
||||||
self.num_bytes_remaining_in_current_chunk = 0;
|
self.num_bytes_remaining_in_current_chunk = 0;
|
||||||
} else {
|
} else {
|
||||||
let num_bytes_written = self.inner.write(&buf[..end.min(buf.len())])?;
|
let num_bytes_written = custom_write(
|
||||||
|
&buf[..end.min(buf.len())],
|
||||||
|
&mut self.inner,
|
||||||
|
self.settings,
|
||||||
|
)?;
|
||||||
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
|
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
|
||||||
total_bytes_written += num_bytes_written;
|
total_bytes_written += num_bytes_written;
|
||||||
buf = &buf[num_bytes_written..];
|
buf = &buf[num_bytes_written..];
|
||||||
|
@ -1261,7 +1335,8 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
|
||||||
// continue to the next iteration. (See chunk 1 in the
|
// continue to the next iteration. (See chunk 1 in the
|
||||||
// example comment above.)
|
// example comment above.)
|
||||||
Some(i) if i < self.num_bytes_remaining_in_current_chunk => {
|
Some(i) if i < self.num_bytes_remaining_in_current_chunk => {
|
||||||
let num_bytes_written = self.inner.write(&buf[..i + 1])?;
|
let num_bytes_written =
|
||||||
|
custom_write(&buf[..i + 1], &mut self.inner, self.settings)?;
|
||||||
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
|
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
|
||||||
total_bytes_written += num_bytes_written;
|
total_bytes_written += num_bytes_written;
|
||||||
buf = &buf[num_bytes_written..];
|
buf = &buf[num_bytes_written..];
|
||||||
|
@ -1279,7 +1354,8 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
|
||||||
== self.chunk_size.try_into().unwrap() =>
|
== self.chunk_size.try_into().unwrap() =>
|
||||||
{
|
{
|
||||||
let end = self.num_bytes_remaining_in_current_chunk;
|
let end = self.num_bytes_remaining_in_current_chunk;
|
||||||
let num_bytes_written = self.inner.write(&buf[..end])?;
|
let num_bytes_written =
|
||||||
|
custom_write(&buf[..end], &mut self.inner, self.settings)?;
|
||||||
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
|
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
|
||||||
total_bytes_written += num_bytes_written;
|
total_bytes_written += num_bytes_written;
|
||||||
buf = &buf[num_bytes_written..];
|
buf = &buf[num_bytes_written..];
|
||||||
|
@ -1315,6 +1391,10 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
|
||||||
///
|
///
|
||||||
/// This function returns an error if there is a problem reading from
|
/// This function returns an error if there is a problem reading from
|
||||||
/// `reader` or writing to one of the output files.
|
/// `reader` or writing to one of the output files.
|
||||||
|
///
|
||||||
|
/// Implements `--number=CHUNKS`
|
||||||
|
/// Where CHUNKS
|
||||||
|
/// * N
|
||||||
fn split_into_n_chunks_by_byte<R>(
|
fn split_into_n_chunks_by_byte<R>(
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
|
@ -1378,29 +1458,26 @@ where
|
||||||
writers.push(writer);
|
writers.push(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture the result of the `std::io::copy()` calls to check for
|
// Write `chunk_size` bytes from the reader into each writer
|
||||||
// `BrokenPipe`.
|
// except the last.
|
||||||
let result: std::io::Result<()> = {
|
//
|
||||||
// Write `chunk_size` bytes from the reader into each writer
|
// The last writer gets all remaining bytes so that if the number
|
||||||
// except the last.
|
// of bytes in the input file was not evenly divisible by
|
||||||
//
|
// `num_chunks`, we don't leave any bytes behind.
|
||||||
// The last writer gets all remaining bytes so that if the number
|
for writer in writers.iter_mut().take(num_chunks - 1) {
|
||||||
// of bytes in the input file was not evenly divisible by
|
match io::copy(&mut reader.by_ref().take(chunk_size), writer) {
|
||||||
// `num_chunks`, we don't leave any bytes behind.
|
Ok(_) => continue,
|
||||||
for writer in writers.iter_mut().take(num_chunks - 1) {
|
Err(e) if ignorable_io_error(&e, settings) => continue,
|
||||||
io::copy(&mut reader.by_ref().take(chunk_size), writer)?;
|
Err(e) => return Err(uio_error!(e, "input/output error")),
|
||||||
}
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// Write all the remaining bytes to the last chunk.
|
// Write all the remaining bytes to the last chunk.
|
||||||
let i = num_chunks - 1;
|
let i = num_chunks - 1;
|
||||||
let last_chunk_size = num_bytes - (chunk_size * (num_chunks as u64 - 1));
|
let last_chunk_size = num_bytes - (chunk_size * (num_chunks as u64 - 1));
|
||||||
io::copy(&mut reader.by_ref().take(last_chunk_size), &mut writers[i])?;
|
match io::copy(&mut reader.by_ref().take(last_chunk_size), &mut writers[i]) {
|
||||||
|
|
||||||
Ok(())
|
|
||||||
};
|
|
||||||
match result {
|
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
Err(e) if e.kind() == ErrorKind::BrokenPipe => Ok(()),
|
Err(e) if ignorable_io_error(&e, settings) => Ok(()),
|
||||||
Err(e) => Err(uio_error!(e, "input/output error")),
|
Err(e) => Err(uio_error!(e, "input/output error")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1415,6 +1492,10 @@ where
|
||||||
///
|
///
|
||||||
/// This function returns an error if there is a problem reading from
|
/// This function returns an error if there is a problem reading from
|
||||||
/// `reader` or writing to stdout.
|
/// `reader` or writing to stdout.
|
||||||
|
///
|
||||||
|
/// Implements `--number=CHUNKS`
|
||||||
|
/// Where CHUNKS
|
||||||
|
/// * K/N
|
||||||
fn kth_chunks_by_byte<R>(
|
fn kth_chunks_by_byte<R>(
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
|
@ -1508,6 +1589,10 @@ where
|
||||||
///
|
///
|
||||||
/// * [`kth_chunk_by_line`], which splits its input in the same way,
|
/// * [`kth_chunk_by_line`], which splits its input in the same way,
|
||||||
/// but writes only one specified chunk to stdout.
|
/// but writes only one specified chunk to stdout.
|
||||||
|
///
|
||||||
|
/// Implements `--number=CHUNKS`
|
||||||
|
/// Where CHUNKS
|
||||||
|
/// * l/N
|
||||||
fn split_into_n_chunks_by_line<R>(
|
fn split_into_n_chunks_by_line<R>(
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
|
@ -1518,7 +1603,9 @@ where
|
||||||
{
|
{
|
||||||
// Get the size of the input file in bytes and compute the number
|
// Get the size of the input file in bytes and compute the number
|
||||||
// of bytes per chunk.
|
// of bytes per chunk.
|
||||||
let metadata = metadata(&settings.input).unwrap();
|
let metadata = metadata(&settings.input).map_err(|_| {
|
||||||
|
USimpleError::new(1, format!("{}: cannot determine file size", settings.input))
|
||||||
|
})?;
|
||||||
let num_bytes = metadata.len();
|
let num_bytes = metadata.len();
|
||||||
let chunk_size = (num_bytes / num_chunks) as usize;
|
let chunk_size = (num_bytes / num_chunks) as usize;
|
||||||
|
|
||||||
|
@ -1550,8 +1637,8 @@ where
|
||||||
let maybe_writer = writers.get_mut(i);
|
let maybe_writer = writers.get_mut(i);
|
||||||
let writer = maybe_writer.unwrap();
|
let writer = maybe_writer.unwrap();
|
||||||
let bytes = line.as_slice();
|
let bytes = line.as_slice();
|
||||||
writer.write_all(bytes)?;
|
custom_write_all(bytes, writer, settings)?;
|
||||||
writer.write_all(&[sep])?;
|
custom_write_all(&[sep], writer, settings)?;
|
||||||
|
|
||||||
// Add one byte for the separator character.
|
// Add one byte for the separator character.
|
||||||
let num_bytes = bytes.len() + 1;
|
let num_bytes = bytes.len() + 1;
|
||||||
|
@ -1581,6 +1668,10 @@ where
|
||||||
///
|
///
|
||||||
/// * [`split_into_n_chunks_by_line`], which splits its input in the
|
/// * [`split_into_n_chunks_by_line`], which splits its input in the
|
||||||
/// same way, but writes each chunk to its own file.
|
/// same way, but writes each chunk to its own file.
|
||||||
|
///
|
||||||
|
/// Implements `--number=CHUNKS`
|
||||||
|
/// Where CHUNKS
|
||||||
|
/// * l/K/N
|
||||||
fn kth_chunk_by_line<R>(
|
fn kth_chunk_by_line<R>(
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
|
@ -1592,7 +1683,9 @@ where
|
||||||
{
|
{
|
||||||
// Get the size of the input file in bytes and compute the number
|
// Get the size of the input file in bytes and compute the number
|
||||||
// of bytes per chunk.
|
// of bytes per chunk.
|
||||||
let metadata = metadata(&settings.input).unwrap();
|
let metadata = metadata(&settings.input).map_err(|_| {
|
||||||
|
USimpleError::new(1, format!("{}: cannot determine file size", settings.input))
|
||||||
|
})?;
|
||||||
let num_bytes = metadata.len();
|
let num_bytes = metadata.len();
|
||||||
let chunk_size = (num_bytes / num_chunks) as usize;
|
let chunk_size = (num_bytes / num_chunks) as usize;
|
||||||
|
|
||||||
|
@ -1601,7 +1694,7 @@ where
|
||||||
let mut writer = stdout.lock();
|
let mut writer = stdout.lock();
|
||||||
|
|
||||||
let mut num_bytes_remaining_in_current_chunk = chunk_size;
|
let mut num_bytes_remaining_in_current_chunk = chunk_size;
|
||||||
let mut i = 0;
|
let mut i = 1;
|
||||||
let sep = settings.separator;
|
let sep = settings.separator;
|
||||||
for line_result in reader.split(sep) {
|
for line_result in reader.split(sep) {
|
||||||
let line = line_result?;
|
let line = line_result?;
|
||||||
|
@ -1628,11 +1721,32 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Split a file into a specific number of chunks by line, but
|
||||||
|
/// assign lines via round-robin
|
||||||
|
///
|
||||||
|
/// This function always creates one output file for each chunk, even
|
||||||
|
/// if there is an error reading or writing one of the chunks or if
|
||||||
|
/// the input file is truncated. However, if the `filter` option is
|
||||||
|
/// being used, then no files are created.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// This function returns an error if there is a problem reading from
|
||||||
|
/// `reader` or writing to one of the output files.
|
||||||
|
///
|
||||||
|
/// # See also
|
||||||
|
///
|
||||||
|
/// * [`split_into_n_chunks_by_line`], which splits its input in the same way,
|
||||||
|
/// but without round robin distribution.
|
||||||
|
///
|
||||||
|
/// Implements `--number=CHUNKS`
|
||||||
|
/// Where CHUNKS
|
||||||
|
/// * r/N
|
||||||
fn split_into_n_chunks_by_line_round_robin<R>(
|
fn split_into_n_chunks_by_line_round_robin<R>(
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
num_chunks: u64,
|
num_chunks: u64,
|
||||||
) -> std::io::Result<()>
|
) -> UResult<()>
|
||||||
where
|
where
|
||||||
R: BufRead,
|
R: BufRead,
|
||||||
{
|
{
|
||||||
|
@ -1659,13 +1773,21 @@ where
|
||||||
|
|
||||||
let num_chunks: usize = num_chunks.try_into().unwrap();
|
let num_chunks: usize = num_chunks.try_into().unwrap();
|
||||||
let sep = settings.separator;
|
let sep = settings.separator;
|
||||||
|
let mut closed_writers = 0;
|
||||||
for (i, line_result) in reader.split(sep).enumerate() {
|
for (i, line_result) in reader.split(sep).enumerate() {
|
||||||
let line = line_result.unwrap();
|
|
||||||
let maybe_writer = writers.get_mut(i % num_chunks);
|
let maybe_writer = writers.get_mut(i % num_chunks);
|
||||||
let writer = maybe_writer.unwrap();
|
let writer = maybe_writer.unwrap();
|
||||||
|
let mut line = line_result.unwrap();
|
||||||
|
line.push(sep);
|
||||||
let bytes = line.as_slice();
|
let bytes = line.as_slice();
|
||||||
writer.write_all(bytes)?;
|
let writer_stdin_open = custom_write_all(bytes, writer, settings)?;
|
||||||
writer.write_all(&[sep])?;
|
if !writer_stdin_open {
|
||||||
|
closed_writers += 1;
|
||||||
|
if closed_writers == num_chunks {
|
||||||
|
// all writers are closed - stop reading
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1689,6 +1811,10 @@ where
|
||||||
///
|
///
|
||||||
/// * [`split_into_n_chunks_by_line_round_robin`], which splits its input in the
|
/// * [`split_into_n_chunks_by_line_round_robin`], which splits its input in the
|
||||||
/// same way, but writes each chunk to its own file.
|
/// same way, but writes each chunk to its own file.
|
||||||
|
///
|
||||||
|
/// Implements `--number=CHUNKS`
|
||||||
|
/// Where CHUNKS
|
||||||
|
/// * r/K/N
|
||||||
fn kth_chunk_by_line_round_robin<R>(
|
fn kth_chunk_by_line_round_robin<R>(
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
|
@ -1705,6 +1831,10 @@ where
|
||||||
let num_chunks: usize = num_chunks.try_into().unwrap();
|
let num_chunks: usize = num_chunks.try_into().unwrap();
|
||||||
let chunk_number: usize = chunk_number.try_into().unwrap();
|
let chunk_number: usize = chunk_number.try_into().unwrap();
|
||||||
let sep = settings.separator;
|
let sep = settings.separator;
|
||||||
|
// The chunk number is given as a 1-indexed number, but it
|
||||||
|
// is a little easier to deal with a 0-indexed number
|
||||||
|
// since `.enumerate()` returns index `i` starting with 0
|
||||||
|
let chunk_number = chunk_number - 1;
|
||||||
for (i, line_result) in reader.split(sep).enumerate() {
|
for (i, line_result) in reader.split(sep).enumerate() {
|
||||||
let line = line_result?;
|
let line = line_result?;
|
||||||
let bytes = line.as_slice();
|
let bytes = line.as_slice();
|
||||||
|
@ -1741,24 +1871,12 @@ fn split(settings: &Settings) -> UResult<()> {
|
||||||
split_into_n_chunks_by_line(settings, &mut reader, num_chunks)
|
split_into_n_chunks_by_line(settings, &mut reader, num_chunks)
|
||||||
}
|
}
|
||||||
Strategy::Number(NumberType::KthLines(chunk_number, num_chunks)) => {
|
Strategy::Number(NumberType::KthLines(chunk_number, num_chunks)) => {
|
||||||
// The chunk number is given as a 1-indexed number, but it
|
|
||||||
// is a little easier to deal with a 0-indexed number.
|
|
||||||
let chunk_number = chunk_number - 1;
|
|
||||||
kth_chunk_by_line(settings, &mut reader, chunk_number, num_chunks)
|
kth_chunk_by_line(settings, &mut reader, chunk_number, num_chunks)
|
||||||
}
|
}
|
||||||
Strategy::Number(NumberType::RoundRobin(num_chunks)) => {
|
Strategy::Number(NumberType::RoundRobin(num_chunks)) => {
|
||||||
match split_into_n_chunks_by_line_round_robin(settings, &mut reader, num_chunks) {
|
split_into_n_chunks_by_line_round_robin(settings, &mut reader, num_chunks)
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
ErrorKind::BrokenPipe => Ok(()),
|
|
||||||
_ => Err(USimpleError::new(1, format!("{e}"))),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Strategy::Number(NumberType::KthRoundRobin(chunk_number, num_chunks)) => {
|
Strategy::Number(NumberType::KthRoundRobin(chunk_number, num_chunks)) => {
|
||||||
// The chunk number is given as a 1-indexed number, but it
|
|
||||||
// is a little easier to deal with a 0-indexed number.
|
|
||||||
let chunk_number = chunk_number - 1;
|
|
||||||
kth_chunk_by_line_round_robin(settings, &mut reader, chunk_number, num_chunks)
|
kth_chunk_by_line_round_robin(settings, &mut reader, chunk_number, num_chunks)
|
||||||
}
|
}
|
||||||
Strategy::Lines(chunk_size) => {
|
Strategy::Lines(chunk_size) => {
|
||||||
|
@ -1775,7 +1893,6 @@ fn split(settings: &Settings) -> UResult<()> {
|
||||||
// indicate that. A special error message needs to be
|
// indicate that. A special error message needs to be
|
||||||
// printed in that case.
|
// printed in that case.
|
||||||
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
|
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
|
||||||
ErrorKind::BrokenPipe => Ok(()),
|
|
||||||
_ => Err(uio_error!(e, "input/output error")),
|
_ => Err(uio_error!(e, "input/output error")),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -1794,7 +1911,6 @@ fn split(settings: &Settings) -> UResult<()> {
|
||||||
// indicate that. A special error message needs to be
|
// indicate that. A special error message needs to be
|
||||||
// printed in that case.
|
// printed in that case.
|
||||||
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
|
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
|
||||||
ErrorKind::BrokenPipe => Ok(()),
|
|
||||||
_ => Err(uio_error!(e, "input/output error")),
|
_ => Err(uio_error!(e, "input/output error")),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -1813,7 +1929,6 @@ fn split(settings: &Settings) -> UResult<()> {
|
||||||
// indicate that. A special error message needs to be
|
// indicate that. A special error message needs to be
|
||||||
// printed in that case.
|
// printed in that case.
|
||||||
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
|
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
|
||||||
ErrorKind::BrokenPipe => Ok(()),
|
|
||||||
_ => Err(uio_error!(e, "input/output error")),
|
_ => Err(uio_error!(e, "input/output error")),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -338,6 +338,36 @@ fn test_filter_broken_pipe() {
|
||||||
.succeeds();
|
.succeeds();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(unix)]
|
||||||
|
fn test_filter_with_kth_chunk() {
|
||||||
|
let scene = TestScenario::new(util_name!());
|
||||||
|
scene
|
||||||
|
.ucmd()
|
||||||
|
.args(&["--filter='some'", "--number=1/2"])
|
||||||
|
.ignore_stdin_write_error()
|
||||||
|
.pipe_in("a\n")
|
||||||
|
.fails()
|
||||||
|
.no_stdout()
|
||||||
|
.stderr_contains("--filter does not process a chunk extracted to stdout");
|
||||||
|
scene
|
||||||
|
.ucmd()
|
||||||
|
.args(&["--filter='some'", "--number=l/1/2"])
|
||||||
|
.ignore_stdin_write_error()
|
||||||
|
.pipe_in("a\n")
|
||||||
|
.fails()
|
||||||
|
.no_stdout()
|
||||||
|
.stderr_contains("--filter does not process a chunk extracted to stdout");
|
||||||
|
scene
|
||||||
|
.ucmd()
|
||||||
|
.args(&["--filter='some'", "--number=r/1/2"])
|
||||||
|
.ignore_stdin_write_error()
|
||||||
|
.pipe_in("a\n")
|
||||||
|
.fails()
|
||||||
|
.no_stdout()
|
||||||
|
.stderr_contains("--filter does not process a chunk extracted to stdout");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_split_lines_number() {
|
fn test_split_lines_number() {
|
||||||
// Test if stdout/stderr for '--lines' option is correct
|
// Test if stdout/stderr for '--lines' option is correct
|
||||||
|
@ -699,6 +729,24 @@ fn test_split_stdin_num_kth_chunk() {
|
||||||
.stderr_only("split: -: cannot determine file size\n");
|
.stderr_only("split: -: cannot determine file size\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_split_stdin_num_line_chunks() {
|
||||||
|
new_ucmd!()
|
||||||
|
.args(&["--number=l/2"])
|
||||||
|
.fails()
|
||||||
|
.code_is(1)
|
||||||
|
.stderr_only("split: -: cannot determine file size\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_split_stdin_num_kth_line_chunk() {
|
||||||
|
new_ucmd!()
|
||||||
|
.args(&["--number=l/2/5"])
|
||||||
|
.fails()
|
||||||
|
.code_is(1)
|
||||||
|
.stderr_only("split: -: cannot determine file size\n");
|
||||||
|
}
|
||||||
|
|
||||||
fn file_read(at: &AtPath, filename: &str) -> String {
|
fn file_read(at: &AtPath, filename: &str) -> String {
|
||||||
let mut s = String::new();
|
let mut s = String::new();
|
||||||
at.open(filename).read_to_string(&mut s).unwrap();
|
at.open(filename).read_to_string(&mut s).unwrap();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue