1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-07-27 14:47:46 +00:00

LibCore: Use CircularBuffer in BufferedHelper

This patch takes care of a FIXME :^)

Co-Authored-By: Tim Schumacher <timschumi@gmx.de>
This commit is contained in:
Lucas CHOLLET 2022-12-17 13:50:18 +01:00 committed by Andrew Kaster
parent f12e81b74a
commit bf06f49417

View file

@ -7,6 +7,7 @@
#pragma once #pragma once
#include <AK/CircularBuffer.h>
#include <AK/DeprecatedString.h> #include <AK/DeprecatedString.h>
#include <AK/EnumBits.h> #include <AK/EnumBits.h>
#include <AK/Function.h> #include <AK/Function.h>
@ -594,7 +595,7 @@ class BufferedHelper {
public: public:
template<StreamLike U> template<StreamLike U>
BufferedHelper(Badge<U>, NonnullOwnPtr<T> stream, ByteBuffer buffer) BufferedHelper(Badge<U>, NonnullOwnPtr<T> stream, CircularBuffer buffer)
: m_stream(move(stream)) : m_stream(move(stream))
, m_buffer(move(buffer)) , m_buffer(move(buffer))
{ {
@ -603,7 +604,6 @@ public:
BufferedHelper(BufferedHelper&& other) BufferedHelper(BufferedHelper&& other)
: m_stream(move(other.m_stream)) : m_stream(move(other.m_stream))
, m_buffer(move(other.m_buffer)) , m_buffer(move(other.m_buffer))
, m_buffered_size(exchange(other.m_buffered_size, 0))
{ {
} }
@ -611,7 +611,6 @@ public:
{ {
m_stream = move(other.m_stream); m_stream = move(other.m_stream);
m_buffer = move(other.m_buffer); m_buffer = move(other.m_buffer);
m_buffered_size = exchange(other.m_buffered_size, 0);
return *this; return *this;
} }
@ -623,7 +622,7 @@ public:
if (!stream->is_open()) if (!stream->is_open())
return Error::from_errno(ENOTCONN); return Error::from_errno(ENOTCONN);
auto buffer = TRY(ByteBuffer::create_uninitialized(buffer_size)); auto buffer = TRY(CircularBuffer::create_empty(buffer_size));
return adopt_nonnull_own_or_enomem(new BufferedType<T>(move(stream), move(buffer))); return adopt_nonnull_own_or_enomem(new BufferedType<T>(move(stream), move(buffer)));
} }
@ -639,28 +638,11 @@ public:
return Error::from_errno(ENOBUFS); return Error::from_errno(ENOBUFS);
// Fill the internal buffer if it has run dry. // Fill the internal buffer if it has run dry.
if (m_buffered_size == 0) if (m_buffer.used_space() == 0)
TRY(populate_read_buffer()); TRY(populate_read_buffer());
// Let's try to take all we can from the buffer first. // Let's try to take all we can from the buffer first.
size_t buffer_nread = 0; return m_buffer.read(buffer);
if (m_buffered_size > 0) {
// FIXME: Use a circular buffer to avoid shifting the buffer
// contents.
size_t amount_to_take = min(buffer.size(), m_buffered_size);
auto slice_to_take = m_buffer.span().slice(0, amount_to_take);
auto slice_to_shift = m_buffer.span().slice(amount_to_take);
slice_to_take.copy_to(buffer);
buffer_nread += amount_to_take;
if (amount_to_take < m_buffered_size) {
m_buffer.overwrite(0, slice_to_shift.data(), m_buffered_size - amount_to_take);
}
m_buffered_size -= amount_to_take;
}
return Bytes { buffer.data(), buffer_nread };
} }
// Reads into the buffer until \n is encountered. // Reads into the buffer until \n is encountered.
@ -690,7 +672,7 @@ public:
return Bytes {}; return Bytes {};
if (stream().is_eof()) { if (stream().is_eof()) {
if (buffer.size() < m_buffered_size) { if (buffer.size() < m_buffer.used_space()) {
// Normally, reading from an EOFed stream and receiving bytes // Normally, reading from an EOFed stream and receiving bytes
// would mean that the stream is no longer EOF. However, it's // would mean that the stream is no longer EOF. However, it's
// possible with a buffered stream that the user is able to read // possible with a buffered stream that the user is able to read
@ -705,7 +687,7 @@ public:
} }
} }
auto readable_size = min(m_buffered_size, buffer.size()); auto const readable_size = min(m_buffer.used_space(), buffer.size());
// The intention here is to try to match all of the possible // The intention here is to try to match all of the possible
// delimiter candidates and try to find the longest one we can // delimiter candidates and try to find the longest one we can
@ -714,7 +696,7 @@ public:
Optional<size_t> longest_match; Optional<size_t> longest_match;
size_t match_size = 0; size_t match_size = 0;
for (auto& candidate : candidates) { for (auto& candidate : candidates) {
auto result = AK::memmem_optional(m_buffer.data(), readable_size, candidate.bytes().data(), candidate.bytes().size()); auto const result = m_buffer.offset_of(candidate, readable_size);
if (result.has_value()) { if (result.has_value()) {
auto previous_match = longest_match.value_or(*result); auto previous_match = longest_match.value_or(*result);
if ((previous_match < *result) || (previous_match == *result && match_size < candidate.length())) { if ((previous_match < *result) || (previous_match == *result && match_size < candidate.length())) {
@ -724,57 +706,44 @@ public:
} }
} }
if (longest_match.has_value()) { if (longest_match.has_value()) {
auto size_written_to_user_buffer = *longest_match; auto const read_bytes = m_buffer.read(buffer.trim(*longest_match));
auto buffer_to_take = m_buffer.span().slice(0, size_written_to_user_buffer); TRY(m_buffer.discard(match_size));
auto buffer_to_shift = m_buffer.span().slice(size_written_to_user_buffer + match_size); return read_bytes;
buffer_to_take.copy_to(buffer);
m_buffer.overwrite(0, buffer_to_shift.data(), buffer_to_shift.size());
m_buffered_size -= size_written_to_user_buffer + match_size;
return buffer.slice(0, size_written_to_user_buffer);
} }
// If we still haven't found anything, then it's most likely the case // If we still haven't found anything, then it's most likely the case
// that the delimiter ends beyond the length of the caller-passed // that the delimiter ends beyond the length of the caller-passed
// buffer. Let's just fill the caller's buffer up. // buffer. Let's just fill the caller's buffer up.
auto buffer_to_take = m_buffer.span().slice(0, readable_size); return m_buffer.read(buffer);
auto buffer_to_shift = m_buffer.span().slice(readable_size);
buffer_to_take.copy_to(buffer);
m_buffer.overwrite(0, buffer_to_shift.data(), buffer_to_shift.size());
m_buffered_size -= readable_size;
return buffer.slice(0, readable_size);
} }
// Returns whether a line can be read, populating the buffer in the process. // Returns whether a line can be read, populating the buffer in the process.
ErrorOr<bool> can_read_line() ErrorOr<bool> can_read_line()
{ {
if (stream().is_eof() && m_buffered_size > 0) if (stream().is_eof() && m_buffer.used_space() > 0)
return true; return true;
if (m_buffer.span().slice(0, m_buffered_size).contains_slow('\n')) if (m_buffer.offset_of("\n"sv).has_value())
return true; return true;
if (stream().is_eof()) if (stream().is_eof())
return false; return false;
while (m_buffered_size < m_buffer.size()) { while (m_buffer.empty_space() > 0) {
auto populated_slice = TRY(populate_read_buffer()); auto populated_byte_count = TRY(populate_read_buffer());
if (stream().is_eof()) { if (stream().is_eof()) {
// We give the user one last hurrah to read the remaining // We give the user one last hurrah to read the remaining
// contents as a "line". // contents as a "line".
return m_buffered_size > 0; return m_buffer.used_space() > 0;
} }
if (populated_slice.contains_slow('\n')) // FIXME: This currently searches through the buffer from the start,
// even if we just appended a small number of bytes at the end.
if (m_buffer.offset_of("\n"sv).has_value())
return true; return true;
if (populated_slice.is_empty()) if (populated_byte_count == 0)
break; break;
} }
@ -783,7 +752,7 @@ public:
bool is_eof() const bool is_eof() const
{ {
if (m_buffered_size > 0) { if (m_buffer.used_space() > 0) {
return false; return false;
} }
@ -792,26 +761,28 @@ public:
size_t buffer_size() const size_t buffer_size() const
{ {
return m_buffer.size(); return m_buffer.capacity();
} }
size_t buffered_data_size() const size_t buffered_data_size() const
{ {
return m_buffered_size; return m_buffer.used_space();
} }
void clear_buffer() void clear_buffer()
{ {
m_buffered_size = 0; m_buffer.clear();
} }
private: private:
ErrorOr<ReadonlyBytes> populate_read_buffer() ErrorOr<size_t> populate_read_buffer()
{ {
if (m_buffered_size == m_buffer.size()) if (m_buffer.empty_space() == 0)
return ReadonlyBytes {}; return 0;
auto fillable_slice = m_buffer.span().slice(m_buffered_size); // TODO: Figure out if we can do direct writes in a comfortable way.
Array<u8, 1024> temporary_buffer;
auto const fillable_slice = temporary_buffer.span().trim(min(temporary_buffer.size(), m_buffer.empty_space()));
size_t nread = 0; size_t nread = 0;
do { do {
auto result = stream().read(fillable_slice); auto result = stream().read(fillable_slice);
@ -824,24 +795,16 @@ private:
break; break;
return result.error(); return result.error();
} }
auto read_size = result.value().size(); auto const filled_slice = result.value();
m_buffered_size += read_size; VERIFY(m_buffer.write(filled_slice) == filled_slice.size());
nread += read_size; nread += filled_slice.size();
break; break;
} while (true); } while (true);
return fillable_slice.slice(0, nread); return nread;
} }
NonnullOwnPtr<T> m_stream; NonnullOwnPtr<T> m_stream;
// FIXME: Replacing this with a circular buffer would be really nice and CircularBuffer m_buffer;
// would avoid excessive copies; however, right now
// AK::CircularDuplexBuffer inlines its entire contents, and that
// would make for a very large object on the stack.
//
// The proper fix is to make a CircularQueue which uses a buffer on
// the heap.
ByteBuffer m_buffer;
size_t m_buffered_size { 0 };
}; };
// NOTE: A Buffered which accepts any Stream could be added here, but it is not // NOTE: A Buffered which accepts any Stream could be added here, but it is not
@ -887,8 +850,8 @@ public:
virtual ~BufferedSeekable() override = default; virtual ~BufferedSeekable() override = default;
private: private:
BufferedSeekable(NonnullOwnPtr<T> stream, ByteBuffer buffer) BufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer)
: m_helper(Badge<BufferedSeekable<T>> {}, move(stream), buffer) : m_helper(Badge<BufferedSeekable<T>> {}, move(stream), move(buffer))
{ {
} }
@ -954,8 +917,8 @@ public:
virtual ~BufferedSocket() override = default; virtual ~BufferedSocket() override = default;
private: private:
BufferedSocket(NonnullOwnPtr<T> stream, ByteBuffer buffer) BufferedSocket(NonnullOwnPtr<T> stream, CircularBuffer buffer)
: m_helper(Badge<BufferedSocket<T>> {}, move(stream), buffer) : m_helper(Badge<BufferedSocket<T>> {}, move(stream), move(buffer))
{ {
setup_notifier(); setup_notifier();
} }