From 30abadcff947feaf9e8c3e968f622bcfdd9b2c1f Mon Sep 17 00:00:00 2001 From: asynts Date: Tue, 18 Aug 2020 18:04:17 +0200 Subject: [PATCH] AK: Add DuplexMemoryStream class. This class is similar to BufferStream because it is possible to both read and write to it. However, it differs in the following ways: - DuplexMemoryStream keeps a history of 64KiB and discards the rest, BufferStream always keeps everything around. - DuplexMemoryStream tracks reading and writing seperately, the following is valid: DuplexMemoryStream stream; stream << 42; int value; stream >> value; For BufferStream it would read: BufferStream stream; stream << 42; int value; stream.seek(0); stream >> value; In the future I would like to replace all usages of BufferStream with InputMemoryStream, OutputMemoryStream (doesn't exist yet) and DuplexMemoryStream. For now I just add DuplexMemoryStream though. --- AK/Forward.h | 2 + AK/Stream.h | 148 +++++++++++++++++++++++++++++++++++++++- AK/Tests/TestStream.cpp | 73 ++++++++++++++++++++ 3 files changed, 222 insertions(+), 1 deletion(-) diff --git a/AK/Forward.h b/AK/Forward.h index 47e8638699..b36bfcad10 100644 --- a/AK/Forward.h +++ b/AK/Forward.h @@ -50,6 +50,7 @@ class Utf32View; class Utf8View; class InputStream; class InputMemoryStream; +class DuplexMemoryStream; template class Span; @@ -125,6 +126,7 @@ using AK::Bytes; using AK::CircularQueue; using AK::DebugLogStream; using AK::DoublyLinkedList; +using AK::DuplexMemoryStream; using AK::FixedArray; using AK::FlyString; using AK::Function; diff --git a/AK/Stream.h b/AK/Stream.h index cd66524cb3..af71773b97 100644 --- a/AK/Stream.h +++ b/AK/Stream.h @@ -26,10 +26,12 @@ #pragma once +#include #include #include #include #include +#include namespace AK::Detail { @@ -52,7 +54,7 @@ protected: namespace AK { -class InputStream : public AK::Detail::Stream { +class InputStream : public virtual AK::Detail::Stream { public: virtual size_t read(Bytes) = 0; virtual bool read_or_error(Bytes) = 0; @@ -60,6 +62,17 @@ public: virtual bool discard_or_error(size_t count) = 0; }; +class OutputStream : public virtual AK::Detail::Stream { +public: + virtual size_t write(ReadonlyBytes) = 0; + virtual bool write_or_error(ReadonlyBytes) = 0; +}; + +class DuplexStream + : public InputStream + , public OutputStream { +}; + #if defined(__cpp_concepts) && !defined(__COVERITY__) template #else @@ -71,8 +84,21 @@ InputStream& operator>>(InputStream& stream, Integral& value) return stream; } +#if defined(__cpp_concepts) && !defined(__COVERITY__) +template +#else +template::value, int>::Type = 0> +#endif +OutputStream& operator<<(OutputStream& stream, Integral value) +{ + stream.write_or_error({ &value, sizeof(value) }); + return stream; +} + #ifndef KERNEL +// FIXME: clang-format adds spaces before the #if for some reason. +// clang-format off #if defined(__cpp_concepts) && !defined(__COVERITY__) template #else @@ -84,7 +110,19 @@ InputStream& operator>>(InputStream& stream, FloatingPoint& value) return stream; } +#if defined(__cpp_concepts) && !defined(__COVERITY__) +template +#else +template::value, int>::Type = 0> #endif +OutputStream& operator<<(OutputStream& stream, FloatingPoint value) +{ + stream.write_or_error({ &value, sizeof(value) }); + return stream; +} + +#endif +// clang-format on inline InputStream& operator>>(InputStream& stream, bool& value) { @@ -92,12 +130,24 @@ inline InputStream& operator>>(InputStream& stream, bool& value) return stream; } +inline OutputStream& operator<<(OutputStream& stream, bool value) +{ + stream.write_or_error({ &value, sizeof(value) }); + return stream; +} + inline InputStream& operator>>(InputStream& stream, Bytes bytes) { stream.read_or_error(bytes); return stream; } +inline OutputStream& operator<<(OutputStream& stream, ReadonlyBytes bytes) +{ + stream.write_or_error(bytes); + return stream; +} + class InputMemoryStream final : public InputStream { friend InputMemoryStream& operator>>(InputMemoryStream& stream, String& string); @@ -224,7 +274,103 @@ private: size_t m_offset { 0 }; }; +// All data written to this stream can be read from it. Reading and writing is done +// using different offsets, meaning that it is not necessary to seek to the start +// before reading; this behaviour differs from BufferStream. +// +// The stream keeps a history of 64KiB which means that seeking backwards is well +// defined. Data past that point will be discarded. +class DuplexMemoryStream final : public DuplexStream { +public: + static constexpr size_t chunk_size = 4 * 1024; + static constexpr size_t history_size = 64 * 1024; + + bool eof() const override { return m_write_offset == m_read_offset; } + + bool discard_or_error(size_t count) override + { + if (m_write_offset - m_read_offset < count) { + m_error = true; + return false; + } + + m_read_offset += count; + try_discard_chunks(); + return true; + } + + size_t read(Bytes bytes) override + { + size_t nread = 0; + while (bytes.size() - nread > 0 && m_write_offset - m_read_offset - nread > 0) { + const auto chunk_index = (m_read_offset - m_base_offset) / chunk_size; + const auto chunk_bytes = m_chunks[chunk_index].bytes().slice(m_read_offset % chunk_size).trim(m_write_offset - m_read_offset - nread); + nread += chunk_bytes.copy_trimmed_to(bytes.slice(nread)); + } + + m_read_offset += nread; + try_discard_chunks(); + return nread; + } + + bool read_or_error(Bytes bytes) override + { + if (m_write_offset - m_read_offset < bytes.size()) { + m_error = true; + return false; + } + + read(bytes); + return true; + } + + size_t write(ReadonlyBytes bytes) override + { + size_t nwritten = 0; + while (bytes.size() - nwritten > 0) { + if ((m_write_offset + nwritten) % chunk_size == 0) + m_chunks.append(ByteBuffer::create_uninitialized(chunk_size)); + + nwritten += bytes.copy_trimmed_to(m_chunks.last().bytes().slice(m_write_offset % chunk_size)); + } + + m_write_offset += nwritten; + return nwritten; + } + + bool write_or_error(ReadonlyBytes bytes) override + { + write(bytes); + return true; + } + + void seek(size_t offset) + { + ASSERT(offset >= m_base_offset); + ASSERT(offset <= m_write_offset); + m_read_offset = offset; + } + + size_t offset() const { return m_read_offset; } + + size_t remaining() const { return m_write_offset - m_read_offset; } + +private: + void try_discard_chunks() + { + while (m_read_offset - m_base_offset >= history_size + chunk_size) { + m_chunks.take_first(); + m_base_offset += chunk_size; + } + } + + Vector m_chunks; + size_t m_write_offset { 0 }; + size_t m_read_offset { 0 }; + size_t m_base_offset { 0 }; +}; } +using AK::DuplexMemoryStream; using AK::InputMemoryStream; using AK::InputStream; diff --git a/AK/Tests/TestStream.cpp b/AK/Tests/TestStream.cpp index 1032dad5ad..a5162885c5 100644 --- a/AK/Tests/TestStream.cpp +++ b/AK/Tests/TestStream.cpp @@ -26,6 +26,7 @@ #include +#include #include static bool compare(ReadonlyBytes lhs, ReadonlyBytes rhs) @@ -108,4 +109,76 @@ TEST_CASE(seeking_slicing_offset) EXPECT(compare({ expected2, sizeof(expected2) }, { actual2, sizeof(actual2) })); } +TEST_CASE(duplex_simple) +{ + DuplexMemoryStream stream; + + EXPECT(stream.eof()); + stream << 42; + EXPECT(!stream.eof()); + + int value; + stream >> value; + EXPECT_EQ(value, 42); + EXPECT(stream.eof()); +} + +TEST_CASE(duplex_seek_into_history) +{ + DuplexMemoryStream stream; + + FixedArray one_kibibyte { 1024 }; + + EXPECT_EQ(stream.remaining(), 0ul); + + for (size_t idx = 0; idx < 256; ++idx) { + stream << one_kibibyte; + } + + EXPECT_EQ(stream.remaining(), 256 * 1024ul); + + for (size_t idx = 0; idx < 128; ++idx) { + stream >> one_kibibyte; + } + + EXPECT_EQ(stream.remaining(), 128 * 1024ul); + + // We now have 128KiB on the stream. Because the stream has a + // history size of 64KiB, we should be able to seek to 64KiB. + static_assert(DuplexMemoryStream::history_size == 64 * 1024); + stream.seek(64 * 1024); + + EXPECT_EQ(stream.remaining(), 192 * 1024ul); + + for (size_t idx = 0; idx < 192; ++idx) { + stream >> one_kibibyte; + } + + EXPECT(stream.eof()); +} + +TEST_CASE(duplex_wild_seeking) +{ + DuplexMemoryStream stream; + + int input0 = 42, input1 = 13, input2 = -12; + int output0, output1, output2; + + stream << input2; + stream << input0 << input1; + stream.seek(0); + stream << input2 << input0; + + stream.seek(4); + stream >> output0 >> output1 >> output2; + + EXPECT(!stream.eof()); + EXPECT_EQ(input0, output0); + EXPECT_EQ(input1, output1); + EXPECT_EQ(input2, output2); + + stream.discard_or_error(4); + EXPECT(stream.eof()); +} + TEST_MAIN(Stream)