1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-07-26 03:37:43 +00:00

LibCore: Remove try_ prefix from fallible SharedCircularQueue methods

This commit is contained in:
Linus Groh 2023-01-28 20:12:17 +00:00 committed by Jelle Raaijmakers
parent 65fa7db2b5
commit 108ea2b921
6 changed files with 36 additions and 36 deletions

View file

@ -18,23 +18,23 @@ Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, s
TEST_CASE(simple_enqueue) TEST_CASE(simple_enqueue)
{ {
auto queue = MUST(TestQueue::try_create()); auto queue = MUST(TestQueue::create());
for (size_t i = 0; i < queue.size() - 1; ++i) for (size_t i = 0; i < queue.size() - 1; ++i)
EXPECT(!queue.try_enqueue((int)i).is_error()); EXPECT(!queue.enqueue((int)i).is_error());
auto result = queue.try_enqueue(0); auto result = queue.enqueue(0);
EXPECT(result.is_error()); EXPECT(result.is_error());
EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full); EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
} }
TEST_CASE(simple_dequeue) TEST_CASE(simple_dequeue)
{ {
auto queue = MUST(TestQueue::try_create()); auto queue = MUST(TestQueue::create());
auto const test_count = 10; auto const test_count = 10;
for (int i = 0; i < test_count; ++i) for (int i = 0; i < test_count; ++i)
(void)queue.try_enqueue(i); (void)queue.enqueue(i);
for (int i = 0; i < test_count; ++i) { for (int i = 0; i < test_count; ++i) {
auto const element = queue.try_dequeue(); auto const element = queue.dequeue();
EXPECT(!element.is_error()); EXPECT(!element.is_error());
EXPECT_EQ(element.value(), i); EXPECT_EQ(element.value(), i);
} }
@ -43,18 +43,18 @@ TEST_CASE(simple_dequeue)
// There is one parallel consumer, but nobody is producing at the same time. // There is one parallel consumer, but nobody is producing at the same time.
TEST_CASE(simple_multithread) TEST_CASE(simple_multithread)
{ {
auto queue = MUST(TestQueue::try_create()); auto queue = MUST(TestQueue::create());
auto const test_count = 10; auto const test_count = 10;
for (int i = 0; i < test_count; ++i) for (int i = 0; i < test_count; ++i)
(void)queue.try_enqueue(i); (void)queue.enqueue(i);
auto second_thread = Threading::Thread::construct([&queue]() { auto second_thread = Threading::Thread::construct([&queue]() {
auto copied_queue = queue; auto copied_queue = queue;
for (int i = 0; i < test_count; ++i) { for (int i = 0; i < test_count; ++i) {
QueueError result = TestQueue::QueueStatus::Invalid; QueueError result = TestQueue::QueueStatus::Invalid;
do { do {
result = copied_queue.try_dequeue(); result = copied_queue.dequeue();
if (!result.is_error()) if (!result.is_error())
EXPECT_EQ(result.value(), i); EXPECT_EQ(result.value(), i);
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
@ -73,7 +73,7 @@ TEST_CASE(simple_multithread)
// There is one parallel consumer and one parallel producer. // There is one parallel consumer and one parallel producer.
TEST_CASE(producer_consumer_multithread) TEST_CASE(producer_consumer_multithread)
{ {
auto queue = MUST(TestQueue::try_create()); auto queue = MUST(TestQueue::create());
// Ensure that we have the possibility of filling the queue up. // Ensure that we have the possibility of filling the queue up.
auto const test_count = queue.size() * 4; auto const test_count = queue.size() * 4;
@ -85,7 +85,7 @@ TEST_CASE(producer_consumer_multithread)
for (size_t i = 0; i < test_count; ++i) { for (size_t i = 0; i < test_count; ++i) {
QueueError result = TestQueue::QueueStatus::Invalid; QueueError result = TestQueue::QueueStatus::Invalid;
do { do {
result = copied_queue.try_dequeue(); result = copied_queue.dequeue();
if (!result.is_error()) if (!result.is_error())
EXPECT_EQ(result.value(), (int)i); EXPECT_EQ(result.value(), (int)i);
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
@ -103,7 +103,7 @@ TEST_CASE(producer_consumer_multithread)
for (size_t i = 0; i < test_count; ++i) { for (size_t i = 0; i < test_count; ++i) {
ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid; ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
do { do {
result = queue.try_enqueue((int)i); result = queue.enqueue((int)i);
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
if (result.is_error()) if (result.is_error())
@ -118,7 +118,7 @@ TEST_CASE(producer_consumer_multithread)
// There are multiple parallel consumers, but nobody is producing at the same time. // There are multiple parallel consumers, but nobody is producing at the same time.
TEST_CASE(multi_consumer) TEST_CASE(multi_consumer)
{ {
auto queue = MUST(TestQueue::try_create()); auto queue = MUST(TestQueue::create());
// This needs to be divisible by 4! // This needs to be divisible by 4!
size_t const test_count = queue.size() - 4; size_t const test_count = queue.size() - 4;
Atomic<size_t> dequeue_count = 0; Atomic<size_t> dequeue_count = 0;
@ -131,7 +131,7 @@ TEST_CASE(multi_consumer)
}; };
for (size_t i = 0; i < test_count; ++i) for (size_t i = 0; i < test_count; ++i)
(void)queue.try_enqueue((int)i); (void)queue.enqueue((int)i);
for (auto thread : threads) for (auto thread : threads)
thread->start(); thread->start();
@ -145,7 +145,7 @@ TEST_CASE(multi_consumer)
// There are multiple parallel consumers and one parallel producer. // There are multiple parallel consumers and one parallel producer.
TEST_CASE(single_producer_multi_consumer) TEST_CASE(single_producer_multi_consumer)
{ {
auto queue = MUST(TestQueue::try_create()); auto queue = MUST(TestQueue::create());
// Choose a higher number to provoke possible race conditions. // Choose a higher number to provoke possible race conditions.
size_t const test_count = queue.size() * 8; size_t const test_count = queue.size() * 8;
Atomic<size_t> dequeue_count = 0; Atomic<size_t> dequeue_count = 0;
@ -162,7 +162,7 @@ TEST_CASE(single_producer_multi_consumer)
for (size_t i = 0; i < test_count; ++i) { for (size_t i = 0; i < test_count; ++i) {
ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid; ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
do { do {
result = queue.try_enqueue((int)i); result = queue.enqueue((int)i);
// After we put something in the first time, let's wait while nobody has dequeued yet. // After we put something in the first time, let's wait while nobody has dequeued yet.
while (dequeue_count.load() == 0) while (dequeue_count.load() == 0)
; ;
@ -188,7 +188,7 @@ Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, s
for (size_t i = 0; i < test_count / 4; ++i) { for (size_t i = 0; i < test_count / 4; ++i) {
QueueError result = TestQueue::QueueStatus::Invalid; QueueError result = TestQueue::QueueStatus::Invalid;
do { do {
result = copied_queue.try_dequeue(); result = copied_queue.dequeue();
if (!result.is_error()) if (!result.is_error())
dequeue_count.fetch_add(1); dequeue_count.fetch_add(1);
// Give others time to do something. // Give others time to do something.

View file

@ -27,10 +27,10 @@ void FilterApplicationCommand::execute()
static Singleton<ImageProcessor> s_image_processor; static Singleton<ImageProcessor> s_image_processor;
ImageProcessor::ImageProcessor() ImageProcessor::ImageProcessor()
: m_command_queue(MUST(Queue::try_create())) : m_command_queue(MUST(Queue::create()))
, m_processor_thread(Threading::Thread::construct([this]() { , m_processor_thread(Threading::Thread::construct([this]() {
while (true) { while (true) {
if (auto next_command = m_command_queue.try_dequeue(); !next_command.is_error()) { if (auto next_command = m_command_queue.dequeue(); !next_command.is_error()) {
next_command.value()->execute(); next_command.value()->execute();
} else { } else {
Threading::MutexLocker locker { m_wakeup_mutex }; Threading::MutexLocker locker { m_wakeup_mutex };
@ -51,7 +51,7 @@ ImageProcessor* ImageProcessor::the()
ErrorOr<void> ImageProcessor::enqueue_command(NonnullRefPtr<ImageProcessingCommand> command) ErrorOr<void> ImageProcessor::enqueue_command(NonnullRefPtr<ImageProcessingCommand> command)
{ {
if (auto queue_status = m_command_queue.try_enqueue(move(command)); queue_status.is_error()) if (auto queue_status = m_command_queue.enqueue(move(command)); queue_status.is_error())
return ENOSPC; return ENOSPC;
if (!m_processor_thread->is_started()) { if (!m_processor_thread->is_started()) {

View file

@ -22,7 +22,7 @@ namespace Audio {
ConnectionToServer::ConnectionToServer(NonnullOwnPtr<Core::Stream::LocalSocket> socket) ConnectionToServer::ConnectionToServer(NonnullOwnPtr<Core::Stream::LocalSocket> socket)
: IPC::ConnectionToServer<AudioClientEndpoint, AudioServerEndpoint>(*this, move(socket)) : IPC::ConnectionToServer<AudioClientEndpoint, AudioServerEndpoint>(*this, move(socket))
, m_buffer(make<AudioQueue>(MUST(AudioQueue::try_create()))) , m_buffer(make<AudioQueue>(MUST(AudioQueue::create())))
, m_user_queue(make<UserSampleQueue>()) , m_user_queue(make<UserSampleQueue>())
, m_background_audio_enqueuer(Threading::Thread::construct([this]() { , m_background_audio_enqueuer(Threading::Thread::construct([this]() {
// All the background thread does is run an event loop. // All the background thread does is run an event loop.
@ -105,7 +105,7 @@ void ConnectionToServer::custom_event(Core::CustomEvent&)
m_user_queue->discard_samples(available_samples); m_user_queue->discard_samples(available_samples);
// FIXME: Could we receive interrupts in a good non-IPC way instead? // FIXME: Could we receive interrupts in a good non-IPC way instead?
auto result = m_buffer->try_blocking_enqueue(next_chunk, [this]() { auto result = m_buffer->blocking_enqueue(next_chunk, [this]() {
nanosleep(&m_good_sleep_time, nullptr); nanosleep(&m_good_sleep_time, nullptr);
}); });
if (result.is_error()) if (result.is_error())
@ -115,12 +115,12 @@ void ConnectionToServer::custom_event(Core::CustomEvent&)
ErrorOr<void, AudioQueue::QueueStatus> ConnectionToServer::realtime_enqueue(Array<Sample, AUDIO_BUFFER_SIZE> samples) ErrorOr<void, AudioQueue::QueueStatus> ConnectionToServer::realtime_enqueue(Array<Sample, AUDIO_BUFFER_SIZE> samples)
{ {
return m_buffer->try_enqueue(samples); return m_buffer->enqueue(samples);
} }
ErrorOr<void> ConnectionToServer::blocking_realtime_enqueue(Array<Sample, AUDIO_BUFFER_SIZE> samples, Function<void()> wait_function) ErrorOr<void> ConnectionToServer::blocking_realtime_enqueue(Array<Sample, AUDIO_BUFFER_SIZE> samples, Function<void()> wait_function)
{ {
return m_buffer->try_blocking_enqueue(samples, move(wait_function)); return m_buffer->blocking_enqueue(samples, move(wait_function));
} }
unsigned ConnectionToServer::total_played_samples() const unsigned ConnectionToServer::total_played_samples() const

View file

@ -62,16 +62,16 @@ public:
SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default; SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default;
// Allocates a new circular queue in shared memory. // Allocates a new circular queue in shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create() static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create()
{ {
auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC)); auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC));
return try_create_internal(fd, true); return create_internal(fd, true);
} }
// Uses an existing circular queue from given shared memory. // Uses an existing circular queue from given shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create(int fd) static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd)
{ {
return try_create_internal(fd, false); return create_internal(fd, false);
} }
constexpr size_t size() const { return Size; } constexpr size_t size() const { return Size; }
@ -90,7 +90,7 @@ public:
ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); } ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); } ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); }
ErrorOr<void, QueueStatus> try_enqueue(ValueType to_insert) ErrorOr<void, QueueStatus> enqueue(ValueType to_insert)
{ {
VERIFY(!m_queue.is_null()); VERIFY(!m_queue.is_null());
if (!can_enqueue()) if (!can_enqueue())
@ -108,11 +108,11 @@ public:
} }
// Repeatedly try to enqueue, using the wait_function to wait if it's not possible // Repeatedly try to enqueue, using the wait_function to wait if it's not possible
ErrorOr<void> try_blocking_enqueue(ValueType to_insert, Function<void()> wait_function) ErrorOr<void> blocking_enqueue(ValueType to_insert, Function<void()> wait_function)
{ {
ErrorOr<void, QueueStatus> result; ErrorOr<void, QueueStatus> result;
while (true) { while (true) {
result = try_enqueue(to_insert); result = enqueue(to_insert);
if (!result.is_error()) if (!result.is_error())
break; break;
if (result.error() != QueueStatus::Full) if (result.error() != QueueStatus::Full)
@ -123,7 +123,7 @@ public:
return {}; return {};
} }
ErrorOr<ValueType, QueueStatus> try_dequeue() ErrorOr<ValueType, QueueStatus> dequeue()
{ {
VERIFY(!m_queue.is_null()); VERIFY(!m_queue.is_null());
while (true) { while (true) {
@ -198,7 +198,7 @@ private:
} }
}; };
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create_internal(int fd, bool is_new) static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(int fd, bool is_new)
{ {
auto name = DeprecatedString::formatted("SharedSingleProducerCircularQueue@{:x}", fd); auto name = DeprecatedString::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name)); auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));

View file

@ -137,7 +137,7 @@ template<Concepts::SharedSingleProducerCircularQueue T>
ErrorOr<T> decode(Decoder& decoder) ErrorOr<T> decode(Decoder& decoder)
{ {
auto anon_file = TRY(decoder.decode<IPC::File>()); auto anon_file = TRY(decoder.decode<IPC::File>());
return T::try_create(anon_file.take_fd()); return T::create(anon_file.take_fd());
} }
template<Concepts::Optional T> template<Concepts::Optional T>

View file

@ -47,7 +47,7 @@ public:
return false; return false;
if (m_in_chunk_location >= m_current_audio_chunk.size()) { if (m_in_chunk_location >= m_current_audio_chunk.size()) {
auto result = m_buffer->try_dequeue(); auto result = m_buffer->dequeue();
if (result.is_error()) { if (result.is_error()) {
if (result.error() == Audio::AudioQueue::QueueStatus::Empty) { if (result.error() == Audio::AudioQueue::QueueStatus::Empty) {
dbgln("Audio client {} can't keep up!", m_client->client_id()); dbgln("Audio client {} can't keep up!", m_client->client_id());
@ -79,7 +79,7 @@ public:
{ {
ErrorOr<Array<Audio::Sample, Audio::AUDIO_BUFFER_SIZE>, Audio::AudioQueue::QueueStatus> result = Audio::AudioQueue::QueueStatus::Invalid; ErrorOr<Array<Audio::Sample, Audio::AUDIO_BUFFER_SIZE>, Audio::AudioQueue::QueueStatus> result = Audio::AudioQueue::QueueStatus::Invalid;
do { do {
result = m_buffer->try_dequeue(); result = m_buffer->dequeue();
} while (result.is_error() && result.error() != Audio::AudioQueue::QueueStatus::Empty); } while (result.is_error() && result.error() != Audio::AudioQueue::QueueStatus::Empty);
} }