mirror of
https://github.com/RGBCube/serenity
synced 2025-07-26 09:17:35 +00:00
LibPthread: Reimplement semaphores
This implementation does not use locking or condition variables internally; it's purely based on atomics and futexes. Notably, concurrent sem_wait() and sem_post() calls can run *completely in parallel* without slowing each other down, as long as there are empty slots for them all to succeed without blocking. Additionally, sem_wait() never executes an atomic operation with release ordering, and sem_post() never executes an atomic operation with acquire ordering (unless you count the syscall). This means the compiler and the hardware are free to reorder code *into* the critical section.
This commit is contained in:
parent
00d8dbe739
commit
690141ff8b
2 changed files with 107 additions and 132 deletions
|
@ -1,12 +1,26 @@
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2021, the SerenityOS developers.
|
* Copyright (c) 2021, Gunnar Beutner <gbeutner@serenityos.org>
|
||||||
|
* Copyright (c) 2021, Sergey Bugaev <bugaevc@serenityos.org>
|
||||||
*
|
*
|
||||||
* SPDX-License-Identifier: BSD-2-Clause
|
* SPDX-License-Identifier: BSD-2-Clause
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <AK/Assertions.h>
|
#include <AK/Assertions.h>
|
||||||
|
#include <AK/Atomic.h>
|
||||||
|
#include <AK/Types.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <semaphore.h>
|
#include <semaphore.h>
|
||||||
|
#include <serenity.h>
|
||||||
|
|
||||||
|
// Whether sem_wait() or sem_post() is responsible for waking any sleeping
|
||||||
|
// threads.
|
||||||
|
static constexpr u32 POST_WAKES = 1 << 31;
|
||||||
|
|
||||||
|
sem_t* sem_open(const char*, int, ...)
|
||||||
|
{
|
||||||
|
errno = ENOSYS;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
int sem_close(sem_t*)
|
int sem_close(sem_t*)
|
||||||
{
|
{
|
||||||
|
@ -14,42 +28,10 @@ int sem_close(sem_t*)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sem_destroy(sem_t* sem)
|
int sem_unlink(const char*)
|
||||||
{
|
{
|
||||||
auto rc = pthread_mutex_destroy(&sem->mtx);
|
errno = ENOSYS;
|
||||||
if (rc != 0) {
|
return -1;
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = pthread_cond_destroy(&sem->cv);
|
|
||||||
if (rc != 0) {
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int sem_getvalue(sem_t* sem, int* sval)
|
|
||||||
{
|
|
||||||
auto rc = pthread_mutex_trylock(&sem->mtx);
|
|
||||||
|
|
||||||
if (rc == EBUSY) {
|
|
||||||
*sval = 0;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rc != 0) {
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
*sval = sem->value;
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&sem->mtx);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int sem_init(sem_t* sem, int shared, unsigned int value)
|
int sem_init(sem_t* sem, int shared, unsigned int value)
|
||||||
|
@ -64,116 +46,110 @@ int sem_init(sem_t* sem, int shared, unsigned int value)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto rc = pthread_mutex_init(&sem->mtx, nullptr);
|
|
||||||
if (rc != 0) {
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = pthread_cond_init(&sem->cv, nullptr);
|
|
||||||
if (rc != 0) {
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
sem->value = value;
|
sem->value = value;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_t* sem_open(const char*, int, ...)
|
int sem_destroy(sem_t*)
|
||||||
{
|
{
|
||||||
errno = ENOSYS;
|
return 0;
|
||||||
return nullptr;
|
}
|
||||||
|
|
||||||
|
int sem_getvalue(sem_t* sem, int* sval)
|
||||||
|
{
|
||||||
|
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
|
||||||
|
*sval = value & ~POST_WAKES;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sem_post(sem_t* sem)
|
int sem_post(sem_t* sem)
|
||||||
{
|
{
|
||||||
auto rc = pthread_mutex_lock(&sem->mtx);
|
u32 value = AK::atomic_fetch_add(&sem->value, 1u, AK::memory_order_release);
|
||||||
if (rc != 0) {
|
// Fast path: no need to wake.
|
||||||
errno = rc;
|
if (!(value & POST_WAKES)) [[likely]]
|
||||||
return -1;
|
return 0;
|
||||||
}
|
|
||||||
|
|
||||||
if (sem->value == SEM_VALUE_MAX) {
|
|
||||||
pthread_mutex_unlock(&sem->mtx);
|
|
||||||
errno = EOVERFLOW;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
sem->value++;
|
|
||||||
|
|
||||||
rc = pthread_cond_signal(&sem->cv);
|
|
||||||
if (rc != 0) {
|
|
||||||
pthread_mutex_unlock(&sem->mtx);
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = pthread_mutex_unlock(&sem->mtx);
|
|
||||||
if (rc != 0) {
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Pass the responsibility for waking more threads if more slots become
|
||||||
|
// available later to sem_wait() in the thread we're about to wake, as
|
||||||
|
// opposed to further sem_post() calls that free up those slots.
|
||||||
|
value = AK::atomic_fetch_and(&sem->value, ~POST_WAKES, AK::memory_order_relaxed);
|
||||||
|
// Check if another sem_post() call has handled it already.
|
||||||
|
if (!(value & POST_WAKES)) [[likely]]
|
||||||
|
return 0;
|
||||||
|
int rc = futex_wake(&sem->value, 1);
|
||||||
|
VERIFY(rc == 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sem_trywait(sem_t* sem)
|
int sem_trywait(sem_t* sem)
|
||||||
{
|
{
|
||||||
auto rc = pthread_mutex_lock(&sem->mtx);
|
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
|
||||||
if (rc != 0) {
|
u32 count = value & ~POST_WAKES;
|
||||||
errno = rc;
|
if (count == 0)
|
||||||
return -1;
|
return EAGAIN;
|
||||||
}
|
// Decrement the count without touching the flag.
|
||||||
|
u32 desired = (count - 1) | (value & POST_WAKES);
|
||||||
if (sem->value == 0) {
|
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire);
|
||||||
pthread_mutex_unlock(&sem->mtx);
|
if (exchanged) [[likely]]
|
||||||
errno = EAGAIN;
|
return 0;
|
||||||
return -1;
|
else
|
||||||
}
|
return EAGAIN;
|
||||||
|
|
||||||
sem->value--;
|
|
||||||
|
|
||||||
rc = pthread_mutex_unlock(&sem->mtx);
|
|
||||||
if (rc != 0) {
|
|
||||||
errno = rc;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int sem_unlink(const char*)
|
|
||||||
{
|
|
||||||
errno = ENOSYS;
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int sem_wait(sem_t* sem)
|
int sem_wait(sem_t* sem)
|
||||||
{
|
{
|
||||||
auto rc = pthread_mutex_lock(&sem->mtx);
|
return sem_timedwait(sem, nullptr);
|
||||||
if (rc != 0) {
|
}
|
||||||
errno = rc;
|
|
||||||
return -1;
|
int sem_timedwait(sem_t* sem, const struct timespec* abstime)
|
||||||
}
|
{
|
||||||
|
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
|
||||||
while (sem->value == 0) {
|
bool responsible_for_waking = false;
|
||||||
rc = pthread_cond_wait(&sem->cv, &sem->mtx);
|
|
||||||
if (rc != 0) {
|
while (true) {
|
||||||
pthread_mutex_unlock(&sem->mtx);
|
u32 count = value & ~POST_WAKES;
|
||||||
errno = rc;
|
if (count > 0) [[likely]] {
|
||||||
return -1;
|
// It looks like there are some free slots.
|
||||||
}
|
u32 whether_post_wakes = value & POST_WAKES;
|
||||||
}
|
bool going_to_wake = false;
|
||||||
|
if (responsible_for_waking && !whether_post_wakes) {
|
||||||
sem->value--;
|
// If we have ourselves been woken up previously, and the
|
||||||
|
// POST_WAKES flag is not set, that means some more slots might
|
||||||
rc = pthread_mutex_unlock(&sem->mtx);
|
// be available now, and it's us who has to wake up additional
|
||||||
if (rc != 0) {
|
// threads.
|
||||||
errno = rc;
|
if (count > 1) [[unlikely]]
|
||||||
return -1;
|
going_to_wake = true;
|
||||||
}
|
// Pass the responsibility for waking up further threads back to
|
||||||
|
// sem_post() calls. In particular, we don't want the threads
|
||||||
return 0;
|
// we're about to wake to try to wake anyone else.
|
||||||
|
whether_post_wakes = POST_WAKES;
|
||||||
|
}
|
||||||
|
// Now, try to commit this.
|
||||||
|
u32 desired = (count - 1) | whether_post_wakes;
|
||||||
|
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire);
|
||||||
|
if (!exchanged) [[unlikely]]
|
||||||
|
// Re-evaluate.
|
||||||
|
continue;
|
||||||
|
if (going_to_wake) [[unlikely]] {
|
||||||
|
int rc = futex_wake(&sem->value, count - 1);
|
||||||
|
VERIFY(rc >= 0);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// We're probably going to sleep, so attempt to set the flag. We do not
|
||||||
|
// commit to sleeping yet, though, as setting the flag may fail and
|
||||||
|
// cause us to reevaluate what we're doing.
|
||||||
|
if (value == 0) {
|
||||||
|
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, POST_WAKES, AK::memory_order_relaxed);
|
||||||
|
if (!exchanged) [[unlikely]]
|
||||||
|
// Re-evaluate.
|
||||||
|
continue;
|
||||||
|
value = POST_WAKES;
|
||||||
|
}
|
||||||
|
// At this point, we're committed to sleeping.
|
||||||
|
responsible_for_waking = true;
|
||||||
|
futex_wait(&sem->value, value, abstime, CLOCK_REALTIME);
|
||||||
|
// This is the state we will probably see upon being waked:
|
||||||
|
value = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,7 @@
|
||||||
__BEGIN_DECLS
|
__BEGIN_DECLS
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pthread_mutex_t mtx;
|
uint32_t value;
|
||||||
pthread_cond_t cv;
|
|
||||||
int value;
|
|
||||||
} sem_t;
|
} sem_t;
|
||||||
|
|
||||||
int sem_close(sem_t*);
|
int sem_close(sem_t*);
|
||||||
|
@ -28,6 +26,7 @@ int sem_post(sem_t*);
|
||||||
int sem_trywait(sem_t*);
|
int sem_trywait(sem_t*);
|
||||||
int sem_unlink(const char*);
|
int sem_unlink(const char*);
|
||||||
int sem_wait(sem_t*);
|
int sem_wait(sem_t*);
|
||||||
|
int sem_timedwait(sem_t*, const struct timespec* abstime);
|
||||||
|
|
||||||
#define SEM_VALUE_MAX INT_MAX
|
#define SEM_VALUE_MAX INT_MAX
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue