1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-10-25 13:02:07 +00:00
serenity/Userland/Libraries/LibCore/SharedCircularQueue.h
kleines Filmröllchen 6b13436ef6 LibCore: Introduce SharedSingleProducerCircularQueue
This new class with an admittedly long OOP-y name provides a circular
queue in shared memory. The queue is a lock-free synchronous queue
implemented with atomics, and its implementation is significantly
simplified by only accounting for one producer (and multiple consumers).
It is intended to be used as a producer-consumer communication
datastructure across processes. The original motivation behind this
class is efficient short-period transfer of audio data in userspace.

This class includes formal proofs of several correctness properties of
the main queue operations `enqueue` and `dequeue`. These proofs are not
100% complete in their existing form as the invariants they depend on
are "handwaved". This seems fine to me right now, as any proof is better
than no proof :^). Anyways, the proofs should build confidence that the
implemented algorithms, which are only roughly based on existing work,
operate correctly in even the worst-case concurrency scenarios.
2022-04-21 13:55:00 +02:00

230 lines
9.1 KiB
C++

/*
* Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Assertions.h>
#include <AK/Atomic.h>
#include <AK/BuiltinWrappers.h>
#include <AK/Debug.h>
#include <AK/Error.h>
#include <AK/Format.h>
#include <AK/Function.h>
#include <AK/NonnullRefPtr.h>
#include <AK/NumericLimits.h>
#include <AK/Platform.h>
#include <AK/RefCounted.h>
#include <AK/RefPtr.h>
#include <AK/String.h>
#include <AK/Types.h>
#include <AK/Variant.h>
#include <AK/Weakable.h>
#include <LibCore/AnonymousBuffer.h>
#include <LibCore/System.h>
#include <errno.h>
#include <fcntl.h>
#include <sched.h>
#include <sys/mman.h>
namespace Core {
// A circular lock-free queue (or a buffer) with a single producer,
// residing in shared memory and designed to be accessible to multiple processes.
// This implementation makes use of the fact that any producer-related code can be sure that
// it's the only producer-related code that is running, which simplifies a bunch of the synchronization code.
// The exclusivity and liveliness for critical sections in this class is proven to be correct
// under the assumption of correct synchronization primitives, i.e. atomics.
// In many circumstances, this is enough for cross-process queues.
// This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory.
// It is a synthetic pointer to the actual shared memory, which is abstracted away from the user.
// FIXME: Make this independent of shared memory, so that we can move it to AK.
// clang-format off
template<typename T, size_t Size = 32>
// Size must be a power of two, which speeds up the modulus operations for indexing.
requires(popcount(Size) == 1)
class SharedSingleProducerCircularQueue final {
// clang-format on
public:
using ValueType = T;
enum class QueueStatus : u8 {
Invalid = 0,
Full,
Empty,
};
SharedSingleProducerCircularQueue() = default;
SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue<ValueType, Size>& queue) = default;
SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue&& queue) = default;
SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default;
// Allocates a new circular queue in shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create()
{
auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC));
return try_create_internal(fd, true);
}
// Uses an existing circular queue from given shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create(int fd)
{
return try_create_internal(fd, false);
}
constexpr size_t size() const { return Size; }
// These functions are provably inconsistent and should only be used as hints to the actual capacity and used count.
ALWAYS_INLINE size_t weak_remaining_capacity() const { return Size - weak_used(); }
ALWAYS_INLINE size_t weak_used() const
{
auto volatile head = m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed);
auto volatile tail = m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed);
return head - tail;
}
ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; }
ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); }
ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); }
ErrorOr<void, QueueStatus> try_enqueue(ValueType to_insert)
{
VERIFY(!m_queue.is_null());
if (!can_enqueue())
return QueueStatus::Full;
auto our_tail = m_queue->m_queue->m_tail.load() % Size;
m_queue->m_queue->m_data[our_tail] = to_insert;
++m_queue->m_queue->m_tail;
return {};
}
ALWAYS_INLINE bool can_enqueue() const
{
return ((head() - 1) % Size) != (m_queue->m_queue->m_tail.load() % Size);
}
// Repeatedly try to enqueue, using the wait_function to wait if it's not possible
ErrorOr<void> try_blocking_enqueue(ValueType to_insert, Function<void()> wait_function)
{
ErrorOr<void, QueueStatus> result;
while (true) {
result = try_enqueue(to_insert);
if (result.is_error()) {
if (result.error() == QueueStatus::Full)
wait_function();
else
return Error::from_string_literal("Unexpected error while enqueuing"sv);
} else {
break;
}
}
return {};
}
ErrorOr<ValueType, QueueStatus> try_dequeue()
{
VERIFY(!m_queue.is_null());
while (true) {
// The >= is not strictly necessary, but it feels safer :^)
if (head() >= m_queue->m_queue->m_tail.load())
return QueueStatus::Empty;
// This CAS only succeeds if nobody is currently dequeuing.
auto size_max = NumericLimits<size_t>::max();
if (m_queue->m_queue->m_head_protector.compare_exchange_strong(size_max, m_queue->m_queue->m_head.load())) {
auto old_head = m_queue->m_queue->m_head.load();
auto data = move(m_queue->m_queue->m_data[old_head % Size]);
m_queue->m_queue->m_head.fetch_add(1);
m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release);
return { move(data) };
}
}
}
// The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing.
size_t head() const
{
return min(m_queue->m_queue->m_head.load(), m_queue->m_queue->m_head_protector.load());
}
private:
struct SharedMemorySPCQ {
SharedMemorySPCQ() = default;
SharedMemorySPCQ(SharedMemorySPCQ const&) = delete;
SharedMemorySPCQ(SharedMemorySPCQ&&) = delete;
~SharedMemorySPCQ() = default;
// Invariant: tail >= head
// Invariant: head and tail are monotonically increasing
// Invariant: tail always points to the next free location where an enqueue can happen.
// Invariant: head always points to the element to be dequeued next.
// Invariant: tail is only modified by enqueue functions.
// Invariant: head is only modified by dequeue functions.
// An empty queue is signalled with: tail = head
// A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array)
// FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant.
// The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough.
CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_tail { 0 };
CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head { 0 };
CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head_protector { NumericLimits<size_t>::max() };
alignas(ValueType) Array<ValueType, Size> m_data;
};
class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> {
friend class SharedSingleProducerCircularQueue;
public:
SharedMemorySPCQ* m_queue;
void* m_raw;
int m_fd;
~RefCountedSharedMemorySPCQ()
{
MUST(System::close(m_fd));
MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ)));
dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw);
}
private:
RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd)
: m_queue(queue)
, m_raw(reinterpret_cast<void*>(queue))
, m_fd(fd)
{
}
};
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create_internal(int fd, bool is_new)
{
auto name = String::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));
dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping);
SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping);
if (!shared_queue)
return Error::from_string_literal("Unexpected error when creating shared queue from raw memory"sv);
return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) };
}
SharedSingleProducerCircularQueue(String name, RefPtr<RefCountedSharedMemorySPCQ> queue)
: m_queue(queue)
, m_name(move(name))
{
}
RefPtr<RefCountedSharedMemorySPCQ> m_queue;
String m_name {};
};
}