From 6b13436ef64b8ae91f71b3fed2e8555f733d7998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?kleines=20Filmr=C3=B6llchen?= Date: Sun, 23 Jan 2022 23:31:51 +0100 Subject: [PATCH] LibCore: Introduce SharedSingleProducerCircularQueue This new class with an admittedly long OOP-y name provides a circular queue in shared memory. The queue is a lock-free synchronous queue implemented with atomics, and its implementation is significantly simplified by only accounting for one producer (and multiple consumers). It is intended to be used as a producer-consumer communication datastructure across processes. The original motivation behind this class is efficient short-period transfer of audio data in userspace. This class includes formal proofs of several correctness properties of the main queue operations `enqueue` and `dequeue`. These proofs are not 100% complete in their existing form as the invariants they depend on are "handwaved". This seems fine to me right now, as any proof is better than no proof :^). Anyways, the proofs should build confidence that the implemented algorithms, which are only roughly based on existing work, operate correctly in even the worst-case concurrency scenarios. --- AK/Debug.h.in | 4 + Meta/CMake/all_the_debug_macros.cmake | 1 + Tests/LibCore/CMakeLists.txt | 2 + ...bCoreSharedSingleProducerCircularQueue.cpp | 203 ++++++++++++++++ .../Libraries/LibCore/SharedCircularQueue.h | 230 ++++++++++++++++++ 5 files changed, 440 insertions(+) create mode 100644 Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp create mode 100644 Userland/Libraries/LibCore/SharedCircularQueue.h diff --git a/AK/Debug.h.in b/AK/Debug.h.in index a4693ffa6c..2861c2e210 100644 --- a/AK/Debug.h.in +++ b/AK/Debug.h.in @@ -386,6 +386,10 @@ #cmakedefine01 SH_LANGUAGE_SERVER_DEBUG #endif +#ifndef SHARED_QUEUE_DEBUG +#cmakedefine01 SHARED_QUEUE_DEBUG +#endif + #ifndef SHELL_JOB_DEBUG #cmakedefine01 SHELL_JOB_DEBUG #endif diff --git a/Meta/CMake/all_the_debug_macros.cmake b/Meta/CMake/all_the_debug_macros.cmake index 6f3bcf3350..923d54e306 100644 --- a/Meta/CMake/all_the_debug_macros.cmake +++ b/Meta/CMake/all_the_debug_macros.cmake @@ -164,6 +164,7 @@ set(SERVICE_DEBUG ON) set(SH_DEBUG ON) set(SHELL_JOB_DEBUG ON) set(SH_LANGUAGE_SERVER_DEBUG ON) +set(SHARED_QUEUE_DEBUG ON) set(SIGNAL_DEBUG ON) set(SLAVEPTY_DEBUG ON) set(SMP_DEBUG ON) diff --git a/Tests/LibCore/CMakeLists.txt b/Tests/LibCore/CMakeLists.txt index 922547c80f..e099250646 100644 --- a/Tests/LibCore/CMakeLists.txt +++ b/Tests/LibCore/CMakeLists.txt @@ -5,6 +5,7 @@ set(TEST_SOURCES TestLibCoreDeferredInvoke.cpp TestLibCoreStream.cpp TestLibCoreFilePermissionsMask.cpp + TestLibCoreSharedSingleProducerCircularQueue.cpp ) foreach(source IN LISTS TEST_SOURCES) @@ -13,5 +14,6 @@ endforeach() # NOTE: Required because of the LocalServer tests target_link_libraries(TestLibCoreStream LibThreading) +target_link_libraries(TestLibCoreSharedSingleProducerCircularQueue LibThreading) install(FILES long_lines.txt 10kb.txt small.txt DESTINATION usr/Tests/LibCore) diff --git a/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp b/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp new file mode 100644 index 0000000000..c691ee972e --- /dev/null +++ b/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2022, kleines Filmröllchen + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include "sched.h" +#include +#include +#include + +using TestQueue = Core::SharedSingleProducerCircularQueue; +using QueueError = ErrorOr; + +Function dequeuer(TestQueue& queue, Atomic& dequeue_count, size_t test_count); + +// These first two cases don't multithread at all. + +TEST_CASE(simple_enqueue) +{ + auto queue = MUST(TestQueue::try_create()); + for (size_t i = 0; i < queue.size() - 1; ++i) + EXPECT(!queue.try_enqueue((int)i).is_error()); + + auto result = queue.try_enqueue(0); + EXPECT(result.is_error()); + EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full); +} + +TEST_CASE(simple_dequeue) +{ + auto queue = MUST(TestQueue::try_create()); + auto const test_count = 10; + for (int i = 0; i < test_count; ++i) + (void)queue.try_enqueue(i); + for (int i = 0; i < test_count; ++i) { + auto const element = queue.try_dequeue(); + EXPECT(!element.is_error()); + EXPECT_EQ(element.value(), i); + } +} + +// There is one parallel consumer, but nobody is producing at the same time. +TEST_CASE(simple_multithread) +{ + auto queue = MUST(TestQueue::try_create()); + auto const test_count = 10; + + for (int i = 0; i < test_count; ++i) + (void)queue.try_enqueue(i); + + auto second_thread = Threading::Thread::construct([&queue]() { + auto copied_queue = queue; + for (int i = 0; i < test_count; ++i) { + QueueError result = TestQueue::QueueStatus::Invalid; + do { + result = copied_queue.try_dequeue(); + if (!result.is_error()) + EXPECT_EQ(result.value(), i); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); + + if (result.is_error()) + FAIL("Unexpected error while dequeueing."); + } + return 0; + }); + second_thread->start(); + (void)second_thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); +} + +// There is one parallel consumer and one parallel producer. +TEST_CASE(producer_consumer_multithread) +{ + auto queue = MUST(TestQueue::try_create()); + // Ensure that we have the possibility of filling the queue up. + auto const test_count = queue.size() * 4; + + Atomic other_thread_running { false }; + + auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() { + auto copied_queue = queue; + other_thread_running.store(true); + for (size_t i = 0; i < test_count; ++i) { + QueueError result = TestQueue::QueueStatus::Invalid; + do { + result = copied_queue.try_dequeue(); + if (!result.is_error()) + EXPECT_EQ(result.value(), (int)i); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); + + if (result.is_error()) + FAIL("Unexpected error while dequeueing."); + } + return 0; + }); + second_thread->start(); + + while (!other_thread_running.load()) + ; + + for (size_t i = 0; i < test_count; ++i) { + ErrorOr result = TestQueue::QueueStatus::Invalid; + do { + result = queue.try_enqueue((int)i); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); + + if (result.is_error()) + FAIL("Unexpected error while enqueueing."); + } + + (void)second_thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); +} + +// There are multiple parallel consumers, but nobody is producing at the same time. +TEST_CASE(multi_consumer) +{ + auto queue = MUST(TestQueue::try_create()); + // This needs to be divisible by 4! + size_t const test_count = queue.size() - 4; + Atomic dequeue_count = 0; + + auto threads = { + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + }; + + for (size_t i = 0; i < test_count; ++i) + (void)queue.try_enqueue((int)i); + + for (auto thread : threads) + thread->start(); + for (auto thread : threads) + (void)thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); + EXPECT_EQ(dequeue_count.load(), (size_t)test_count); +} + +// There are multiple parallel consumers and one parallel producer. +TEST_CASE(single_producer_multi_consumer) +{ + auto queue = MUST(TestQueue::try_create()); + // Choose a higher number to provoke possible race conditions. + size_t const test_count = queue.size() * 8; + Atomic dequeue_count = 0; + + auto threads = { + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + }; + for (auto thread : threads) + thread->start(); + + for (size_t i = 0; i < test_count; ++i) { + ErrorOr result = TestQueue::QueueStatus::Invalid; + do { + result = queue.try_enqueue((int)i); + // After we put something in the first time, let's wait while nobody has dequeued yet. + while (dequeue_count.load() == 0) + ; + // Give others time to do something. + sched_yield(); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); + + if (result.is_error()) + FAIL("Unexpected error while enqueueing."); + } + + for (auto thread : threads) + (void)thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); + EXPECT_EQ(dequeue_count.load(), (size_t)test_count); +} + +Function dequeuer(TestQueue& queue, Atomic& dequeue_count, size_t const test_count) +{ + return [&queue, &dequeue_count, test_count]() { + auto copied_queue = queue; + for (size_t i = 0; i < test_count / 4; ++i) { + QueueError result = TestQueue::QueueStatus::Invalid; + do { + result = copied_queue.try_dequeue(); + if (!result.is_error()) + dequeue_count.fetch_add(1); + // Give others time to do something. + sched_yield(); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); + + if (result.is_error()) + FAIL("Unexpected error while dequeueing."); + } + return (intptr_t)0; + }; +} diff --git a/Userland/Libraries/LibCore/SharedCircularQueue.h b/Userland/Libraries/LibCore/SharedCircularQueue.h new file mode 100644 index 0000000000..fa4bb01edf --- /dev/null +++ b/Userland/Libraries/LibCore/SharedCircularQueue.h @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2022, kleines Filmröllchen + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Core { + +// A circular lock-free queue (or a buffer) with a single producer, +// residing in shared memory and designed to be accessible to multiple processes. +// This implementation makes use of the fact that any producer-related code can be sure that +// it's the only producer-related code that is running, which simplifies a bunch of the synchronization code. +// The exclusivity and liveliness for critical sections in this class is proven to be correct +// under the assumption of correct synchronization primitives, i.e. atomics. +// In many circumstances, this is enough for cross-process queues. +// This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory. +// It is a synthetic pointer to the actual shared memory, which is abstracted away from the user. +// FIXME: Make this independent of shared memory, so that we can move it to AK. +// clang-format off +template +// Size must be a power of two, which speeds up the modulus operations for indexing. +requires(popcount(Size) == 1) +class SharedSingleProducerCircularQueue final { + // clang-format on + +public: + using ValueType = T; + + enum class QueueStatus : u8 { + Invalid = 0, + Full, + Empty, + }; + + SharedSingleProducerCircularQueue() = default; + SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue& queue) = default; + + SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue&& queue) = default; + SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default; + + // Allocates a new circular queue in shared memory. + static ErrorOr> try_create() + { + auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC)); + return try_create_internal(fd, true); + } + + // Uses an existing circular queue from given shared memory. + static ErrorOr> try_create(int fd) + { + return try_create_internal(fd, false); + } + + constexpr size_t size() const { return Size; } + // These functions are provably inconsistent and should only be used as hints to the actual capacity and used count. + ALWAYS_INLINE size_t weak_remaining_capacity() const { return Size - weak_used(); } + ALWAYS_INLINE size_t weak_used() const + { + auto volatile head = m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); + auto volatile tail = m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); + return head - tail; + } + + ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; } + ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); } + + ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); } + ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); } + + ErrorOr try_enqueue(ValueType to_insert) + { + VERIFY(!m_queue.is_null()); + if (!can_enqueue()) + return QueueStatus::Full; + auto our_tail = m_queue->m_queue->m_tail.load() % Size; + m_queue->m_queue->m_data[our_tail] = to_insert; + ++m_queue->m_queue->m_tail; + + return {}; + } + + ALWAYS_INLINE bool can_enqueue() const + { + return ((head() - 1) % Size) != (m_queue->m_queue->m_tail.load() % Size); + } + + // Repeatedly try to enqueue, using the wait_function to wait if it's not possible + ErrorOr try_blocking_enqueue(ValueType to_insert, Function wait_function) + { + ErrorOr result; + while (true) { + result = try_enqueue(to_insert); + + if (result.is_error()) { + if (result.error() == QueueStatus::Full) + wait_function(); + else + return Error::from_string_literal("Unexpected error while enqueuing"sv); + } else { + break; + } + } + return {}; + } + + ErrorOr try_dequeue() + { + VERIFY(!m_queue.is_null()); + while (true) { + // The >= is not strictly necessary, but it feels safer :^) + if (head() >= m_queue->m_queue->m_tail.load()) + return QueueStatus::Empty; + + // This CAS only succeeds if nobody is currently dequeuing. + auto size_max = NumericLimits::max(); + if (m_queue->m_queue->m_head_protector.compare_exchange_strong(size_max, m_queue->m_queue->m_head.load())) { + auto old_head = m_queue->m_queue->m_head.load(); + auto data = move(m_queue->m_queue->m_data[old_head % Size]); + m_queue->m_queue->m_head.fetch_add(1); + m_queue->m_queue->m_head_protector.store(NumericLimits::max(), AK::MemoryOrder::memory_order_release); + return { move(data) }; + } + } + } + + // The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing. + size_t head() const + { + return min(m_queue->m_queue->m_head.load(), m_queue->m_queue->m_head_protector.load()); + } + +private: + struct SharedMemorySPCQ { + SharedMemorySPCQ() = default; + SharedMemorySPCQ(SharedMemorySPCQ const&) = delete; + SharedMemorySPCQ(SharedMemorySPCQ&&) = delete; + ~SharedMemorySPCQ() = default; + + // Invariant: tail >= head + // Invariant: head and tail are monotonically increasing + // Invariant: tail always points to the next free location where an enqueue can happen. + // Invariant: head always points to the element to be dequeued next. + // Invariant: tail is only modified by enqueue functions. + // Invariant: head is only modified by dequeue functions. + // An empty queue is signalled with: tail = head + // A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array) + // FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant. + // The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough. + CACHE_ALIGNED Atomic m_tail { 0 }; + CACHE_ALIGNED Atomic m_head { 0 }; + CACHE_ALIGNED Atomic m_head_protector { NumericLimits::max() }; + + alignas(ValueType) Array m_data; + }; + + class RefCountedSharedMemorySPCQ : public RefCounted { + friend class SharedSingleProducerCircularQueue; + + public: + SharedMemorySPCQ* m_queue; + void* m_raw; + int m_fd; + + ~RefCountedSharedMemorySPCQ() + { + MUST(System::close(m_fd)); + MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ))); + dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw); + } + + private: + RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd) + : m_queue(queue) + , m_raw(reinterpret_cast(queue)) + , m_fd(fd) + { + } + }; + + static ErrorOr> try_create_internal(int fd, bool is_new) + { + auto name = String::formatted("SharedSingleProducerCircularQueue@{:x}", fd); + auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name)); + dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping); + + SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast(raw_mapping); + + if (!shared_queue) + return Error::from_string_literal("Unexpected error when creating shared queue from raw memory"sv); + + return SharedSingleProducerCircularQueue { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) }; + } + + SharedSingleProducerCircularQueue(String name, RefPtr queue) + : m_queue(queue) + , m_name(move(name)) + { + } + + RefPtr m_queue; + + String m_name {}; +}; + +}