mirror of
https://github.com/RGBCube/serenity
synced 2025-07-25 23:37:35 +00:00
LibCore: Allow EventLoops to run on multiple threads safely
The event loop system was previously very singletony to the point that
there's only a single event loop stack per process and only one event
loop (the topmost) can run at a time. This commit simply makes the event
loop stack and related structures thread-local so that each thread has
an isolated event loop system.
Some things are kept at a global level and synchronized with the new
MutexProtected: The main event loop needs to still be obtainable from
anywhere, as it closes down the application when it exits. The ID
allocator is global as IDs should not be shared even between threads.
And for the inspector server connection, the same as for the main loop
holds.
Note that currently, the wake pipe is only created by the main thread,
so notifications don't work on other threads.
This removes the temporary mutex fix for notifiers, introduced in
0631d3fed5
.
This commit is contained in:
parent
3d6e08156d
commit
69c1910037
4 changed files with 80 additions and 61 deletions
|
@ -325,6 +325,8 @@ TEST_CASE(local_socket_read)
|
||||||
// connected.
|
// connected.
|
||||||
auto background_action = Threading::BackgroundAction<int>::construct(
|
auto background_action = Threading::BackgroundAction<int>::construct(
|
||||||
[](auto&) {
|
[](auto&) {
|
||||||
|
Core::EventLoop event_loop;
|
||||||
|
|
||||||
auto maybe_client_socket = Core::Stream::LocalSocket::connect("/tmp/test-socket");
|
auto maybe_client_socket = Core::Stream::LocalSocket::connect("/tmp/test-socket");
|
||||||
EXPECT(!maybe_client_socket.is_error());
|
EXPECT(!maybe_client_socket.is_error());
|
||||||
auto client_socket = maybe_client_socket.release_value();
|
auto client_socket = maybe_client_socket.release_value();
|
||||||
|
|
|
@ -121,7 +121,7 @@ void ThreadStackWidget::refresh()
|
||||||
[weak_this = make_weak_ptr()](auto result) {
|
[weak_this = make_weak_ptr()](auto result) {
|
||||||
if (!weak_this)
|
if (!weak_this)
|
||||||
return;
|
return;
|
||||||
Core::EventLoop::main().post_event(const_cast<Core::Object&>(*weak_this), make<CompletionEvent>(move(result)));
|
Core::EventLoop::with_main_locked([&](auto* main_event_loop) { main_event_loop->post_event(const_cast<Core::Object&>(*weak_this), make<CompletionEvent>(move(result))); });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
|
* Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
|
||||||
|
* Copyright (c) 2022, kleines Filmröllchen <malu.bertsch@gmail.com>
|
||||||
*
|
*
|
||||||
* SPDX-License-Identifier: BSD-2-Clause
|
* SPDX-License-Identifier: BSD-2-Clause
|
||||||
*/
|
*/
|
||||||
|
@ -21,6 +22,7 @@
|
||||||
#include <LibCore/Notifier.h>
|
#include <LibCore/Notifier.h>
|
||||||
#include <LibCore/Object.h>
|
#include <LibCore/Object.h>
|
||||||
#include <LibThreading/Mutex.h>
|
#include <LibThreading/Mutex.h>
|
||||||
|
#include <LibThreading/MutexProtected.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
@ -54,19 +56,20 @@ struct EventLoop::Private {
|
||||||
Threading::Mutex lock;
|
Threading::Mutex lock;
|
||||||
};
|
};
|
||||||
|
|
||||||
static EventLoop* s_main_event_loop;
|
// The main event loop is global to the program, so it may be accessed from multiple threads.
|
||||||
static Vector<EventLoop&>* s_event_loop_stack;
|
// NOTE: s_main_event_loop is not declared here as it is needed in the header.
|
||||||
static NeverDestroyed<IDAllocator> s_id_allocator;
|
static Threading::MutexProtected<NeverDestroyed<IDAllocator>> s_id_allocator;
|
||||||
static HashMap<int, NonnullOwnPtr<EventLoopTimer>>* s_timers;
|
static Threading::MutexProtected<RefPtr<InspectorServerConnection>> s_inspector_server_connection;
|
||||||
static HashTable<Notifier*>* s_notifiers;
|
|
||||||
static Threading::Mutex s_notifiers_mutex;
|
|
||||||
|
|
||||||
int EventLoop::s_wake_pipe_fds[2];
|
// Each thread has its own event loop stack, its own timers, notifiers and a wake pipe.
|
||||||
static RefPtr<InspectorServerConnection> s_inspector_server_connection;
|
static thread_local Vector<EventLoop&>* s_event_loop_stack;
|
||||||
|
static thread_local HashMap<int, NonnullOwnPtr<EventLoopTimer>>* s_timers;
|
||||||
|
static thread_local HashTable<Notifier*>* s_notifiers;
|
||||||
|
thread_local int EventLoop::s_wake_pipe_fds[2];
|
||||||
|
|
||||||
bool EventLoop::has_been_instantiated()
|
bool EventLoop::has_been_instantiated()
|
||||||
{
|
{
|
||||||
return s_main_event_loop;
|
return s_event_loop_stack != nullptr && !s_event_loop_stack->is_empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
class SignalHandlers : public RefCounted<SignalHandlers> {
|
class SignalHandlers : public RefCounted<SignalHandlers> {
|
||||||
|
@ -130,7 +133,9 @@ class InspectorServerConnection : public Object {
|
||||||
private:
|
private:
|
||||||
explicit InspectorServerConnection(RefPtr<LocalSocket> socket)
|
explicit InspectorServerConnection(RefPtr<LocalSocket> socket)
|
||||||
: m_socket(move(socket))
|
: m_socket(move(socket))
|
||||||
, m_client_id(s_id_allocator->allocate())
|
, m_client_id(s_id_allocator.with_locked([](auto& allocator) {
|
||||||
|
return allocator->allocate();
|
||||||
|
}))
|
||||||
{
|
{
|
||||||
#ifdef __serenity__
|
#ifdef __serenity__
|
||||||
add_child(*m_socket);
|
add_child(*m_socket);
|
||||||
|
@ -249,7 +254,7 @@ public:
|
||||||
|
|
||||||
void shutdown()
|
void shutdown()
|
||||||
{
|
{
|
||||||
s_id_allocator->deallocate(m_client_id);
|
s_id_allocator.with_locked([this](auto& allocator) { allocator->deallocate(m_client_id); });
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -266,30 +271,34 @@ EventLoop::EventLoop([[maybe_unused]] MakeInspectable make_inspectable)
|
||||||
s_timers = new HashMap<int, NonnullOwnPtr<EventLoopTimer>>;
|
s_timers = new HashMap<int, NonnullOwnPtr<EventLoopTimer>>;
|
||||||
s_notifiers = new HashTable<Notifier*>;
|
s_notifiers = new HashTable<Notifier*>;
|
||||||
}
|
}
|
||||||
|
s_main_event_loop.with_locked([&, this](auto*& main_event_loop) {
|
||||||
if (!s_main_event_loop) {
|
if (main_event_loop == nullptr) {
|
||||||
s_main_event_loop = this;
|
// FIXME: The compiler complains that we don't use main_event_loop although we set it.
|
||||||
s_pid = getpid();
|
main_event_loop = this;
|
||||||
|
s_pid = getpid();
|
||||||
|
// FIXME: We only create the wake pipe for the main thread
|
||||||
#if defined(SOCK_NONBLOCK)
|
#if defined(SOCK_NONBLOCK)
|
||||||
int rc = pipe2(s_wake_pipe_fds, O_CLOEXEC);
|
int rc = pipe2(s_wake_pipe_fds, O_CLOEXEC);
|
||||||
#else
|
#else
|
||||||
int rc = pipe(s_wake_pipe_fds);
|
int rc = pipe(s_wake_pipe_fds);
|
||||||
fcntl(s_wake_pipe_fds[0], F_SETFD, FD_CLOEXEC);
|
fcntl(s_wake_pipe_fds[0], F_SETFD, FD_CLOEXEC);
|
||||||
fcntl(s_wake_pipe_fds[1], F_SETFD, FD_CLOEXEC);
|
fcntl(s_wake_pipe_fds[1], F_SETFD, FD_CLOEXEC);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
VERIFY(rc == 0);
|
VERIFY(rc == 0);
|
||||||
s_event_loop_stack->append(*this);
|
s_event_loop_stack->append(*this);
|
||||||
|
|
||||||
#ifdef __serenity__
|
#ifdef __serenity__
|
||||||
if (getuid() != 0
|
if (getuid() != 0
|
||||||
&& make_inspectable == MakeInspectable::Yes
|
&& make_inspectable == MakeInspectable::Yes
|
||||||
&& !s_inspector_server_connection) {
|
// FIXME: Deadlock potential; though the main loop and inspector server connection are rarely used in conjunction
|
||||||
if (!connect_to_inspector_server())
|
&& s_inspector_server_connection.with_locked([](auto inspector_server_connection) { return inspector_server_connection; })) {
|
||||||
dbgln("Core::EventLoop: Failed to connect to InspectorServer");
|
if (!connect_to_inspector_server())
|
||||||
}
|
dbgln("Core::EventLoop: Failed to connect to InspectorServer");
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
dbgln_if(EVENTLOOP_DEBUG, "{} Core::EventLoop constructed :)", getpid());
|
dbgln_if(EVENTLOOP_DEBUG, "{} Core::EventLoop constructed :)", getpid());
|
||||||
}
|
}
|
||||||
|
@ -297,10 +306,12 @@ EventLoop::EventLoop([[maybe_unused]] MakeInspectable make_inspectable)
|
||||||
EventLoop::~EventLoop()
|
EventLoop::~EventLoop()
|
||||||
{
|
{
|
||||||
// NOTE: Pop the main event loop off of the stack when destroyed.
|
// NOTE: Pop the main event loop off of the stack when destroyed.
|
||||||
if (this == s_main_event_loop) {
|
s_main_event_loop.with_locked([this](auto*& main_event_loop) {
|
||||||
s_event_loop_stack->take_last();
|
if (this == main_event_loop) {
|
||||||
s_main_event_loop = nullptr;
|
s_event_loop_stack->take_last();
|
||||||
}
|
main_event_loop = nullptr;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool connect_to_inspector_server()
|
bool connect_to_inspector_server()
|
||||||
|
@ -309,19 +320,15 @@ bool connect_to_inspector_server()
|
||||||
auto socket = Core::LocalSocket::construct();
|
auto socket = Core::LocalSocket::construct();
|
||||||
if (!socket->connect(SocketAddress::local("/tmp/portal/inspectables")))
|
if (!socket->connect(SocketAddress::local("/tmp/portal/inspectables")))
|
||||||
return false;
|
return false;
|
||||||
s_inspector_server_connection = InspectorServerConnection::construct(move(socket));
|
s_inspector_server_connection.with_locked([&](auto& inspector_server_connection) {
|
||||||
|
inspector_server_connection = InspectorServerConnection::construct(move(socket));
|
||||||
|
});
|
||||||
return true;
|
return true;
|
||||||
#else
|
#else
|
||||||
VERIFY_NOT_REACHED();
|
VERIFY_NOT_REACHED();
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
EventLoop& EventLoop::main()
|
|
||||||
{
|
|
||||||
VERIFY(s_main_event_loop);
|
|
||||||
return *s_main_event_loop;
|
|
||||||
}
|
|
||||||
|
|
||||||
EventLoop& EventLoop::current()
|
EventLoop& EventLoop::current()
|
||||||
{
|
{
|
||||||
return s_event_loop_stack->last();
|
return s_event_loop_stack->last();
|
||||||
|
@ -346,20 +353,25 @@ public:
|
||||||
EventLoopPusher(EventLoop& event_loop)
|
EventLoopPusher(EventLoop& event_loop)
|
||||||
: m_event_loop(event_loop)
|
: m_event_loop(event_loop)
|
||||||
{
|
{
|
||||||
if (&m_event_loop != s_main_event_loop) {
|
if (!is_main_event_loop()) {
|
||||||
m_event_loop.take_pending_events_from(EventLoop::current());
|
m_event_loop.take_pending_events_from(EventLoop::current());
|
||||||
s_event_loop_stack->append(event_loop);
|
s_event_loop_stack->append(event_loop);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
~EventLoopPusher()
|
~EventLoopPusher()
|
||||||
{
|
{
|
||||||
if (&m_event_loop != s_main_event_loop) {
|
if (!is_main_event_loop()) {
|
||||||
s_event_loop_stack->take_last();
|
s_event_loop_stack->take_last();
|
||||||
EventLoop::current().take_pending_events_from(m_event_loop);
|
EventLoop::current().take_pending_events_from(m_event_loop);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
bool is_main_event_loop()
|
||||||
|
{
|
||||||
|
return s_main_event_loop.with_locked([this](auto* main_event_loop) { return &m_event_loop == main_event_loop; });
|
||||||
|
}
|
||||||
|
|
||||||
EventLoop& m_event_loop;
|
EventLoop& m_event_loop;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -571,7 +583,7 @@ void EventLoop::notify_forked(ForkEvent event)
|
||||||
{
|
{
|
||||||
switch (event) {
|
switch (event) {
|
||||||
case ForkEvent::Child:
|
case ForkEvent::Child:
|
||||||
s_main_event_loop = nullptr;
|
s_main_event_loop.with_locked([]([[maybe_unused]] auto*& main_event_loop) { main_event_loop = nullptr; });
|
||||||
s_event_loop_stack->clear();
|
s_event_loop_stack->clear();
|
||||||
s_timers->clear();
|
s_timers->clear();
|
||||||
s_notifiers->clear();
|
s_notifiers->clear();
|
||||||
|
@ -581,7 +593,7 @@ void EventLoop::notify_forked(ForkEvent event)
|
||||||
}
|
}
|
||||||
s_pid = 0;
|
s_pid = 0;
|
||||||
#ifdef __serenity__
|
#ifdef __serenity__
|
||||||
s_inspector_server_connection = nullptr;
|
s_main_event_loop.with_locked([]([[maybe_unused]] auto*& main_event_loop) { main_event_loop = nullptr; });
|
||||||
#endif
|
#endif
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -608,16 +620,13 @@ retry:
|
||||||
add_fd_to_set(s_wake_pipe_fds[0], rfds);
|
add_fd_to_set(s_wake_pipe_fds[0], rfds);
|
||||||
max_fd = max(max_fd, max_fd_added);
|
max_fd = max(max_fd, max_fd_added);
|
||||||
|
|
||||||
{
|
for (auto& notifier : *s_notifiers) {
|
||||||
Threading::MutexLocker locker(s_notifiers_mutex);
|
if (notifier->event_mask() & Notifier::Read)
|
||||||
for (auto& notifier : *s_notifiers) {
|
add_fd_to_set(notifier->fd(), rfds);
|
||||||
if (notifier->event_mask() & Notifier::Read)
|
if (notifier->event_mask() & Notifier::Write)
|
||||||
add_fd_to_set(notifier->fd(), rfds);
|
add_fd_to_set(notifier->fd(), wfds);
|
||||||
if (notifier->event_mask() & Notifier::Write)
|
if (notifier->event_mask() & Notifier::Exceptional)
|
||||||
add_fd_to_set(notifier->fd(), wfds);
|
VERIFY_NOT_REACHED();
|
||||||
if (notifier->event_mask() & Notifier::Exceptional)
|
|
||||||
VERIFY_NOT_REACHED();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool queued_events_is_empty;
|
bool queued_events_is_empty;
|
||||||
|
@ -704,7 +713,6 @@ try_select_again:
|
||||||
if (!marked_fd_count)
|
if (!marked_fd_count)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Threading::MutexLocker locker(s_notifiers_mutex);
|
|
||||||
for (auto& notifier : *s_notifiers) {
|
for (auto& notifier : *s_notifiers) {
|
||||||
if (FD_ISSET(notifier->fd(), &rfds)) {
|
if (FD_ISSET(notifier->fd(), &rfds)) {
|
||||||
if (notifier->event_mask() & Notifier::Event::Read)
|
if (notifier->event_mask() & Notifier::Event::Read)
|
||||||
|
@ -752,7 +760,7 @@ int EventLoop::register_timer(Object& object, int milliseconds, bool should_relo
|
||||||
timer->reload(Time::now_monotonic_coarse());
|
timer->reload(Time::now_monotonic_coarse());
|
||||||
timer->should_reload = should_reload;
|
timer->should_reload = should_reload;
|
||||||
timer->fire_when_not_visible = fire_when_not_visible;
|
timer->fire_when_not_visible = fire_when_not_visible;
|
||||||
int timer_id = s_id_allocator->allocate();
|
int timer_id = s_id_allocator.with_locked([](auto& allocator) { return allocator->allocate(); });
|
||||||
timer->timer_id = timer_id;
|
timer->timer_id = timer_id;
|
||||||
s_timers->set(timer_id, move(timer));
|
s_timers->set(timer_id, move(timer));
|
||||||
return timer_id;
|
return timer_id;
|
||||||
|
@ -760,7 +768,7 @@ int EventLoop::register_timer(Object& object, int milliseconds, bool should_relo
|
||||||
|
|
||||||
bool EventLoop::unregister_timer(int timer_id)
|
bool EventLoop::unregister_timer(int timer_id)
|
||||||
{
|
{
|
||||||
s_id_allocator->deallocate(timer_id);
|
s_id_allocator.with_locked([&](auto& allocator) { allocator->deallocate(timer_id); });
|
||||||
auto it = s_timers->find(timer_id);
|
auto it = s_timers->find(timer_id);
|
||||||
if (it == s_timers->end())
|
if (it == s_timers->end())
|
||||||
return false;
|
return false;
|
||||||
|
@ -770,13 +778,11 @@ bool EventLoop::unregister_timer(int timer_id)
|
||||||
|
|
||||||
void EventLoop::register_notifier(Badge<Notifier>, Notifier& notifier)
|
void EventLoop::register_notifier(Badge<Notifier>, Notifier& notifier)
|
||||||
{
|
{
|
||||||
Threading::MutexLocker locker(s_notifiers_mutex);
|
|
||||||
s_notifiers->set(¬ifier);
|
s_notifiers->set(¬ifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
void EventLoop::unregister_notifier(Badge<Notifier>, Notifier& notifier)
|
void EventLoop::unregister_notifier(Badge<Notifier>, Notifier& notifier)
|
||||||
{
|
{
|
||||||
Threading::MutexLocker locker(s_notifiers_mutex);
|
|
||||||
s_notifiers->remove(¬ifier);
|
s_notifiers->remove(¬ifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
|
* Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
|
||||||
|
* Copyright (c) 2022, kleines Filmröllchen <malu.bertsch@gmail.com>
|
||||||
*
|
*
|
||||||
* SPDX-License-Identifier: BSD-2-Clause
|
* SPDX-License-Identifier: BSD-2-Clause
|
||||||
*/
|
*/
|
||||||
|
@ -18,11 +19,14 @@
|
||||||
#include <LibCore/DeferredInvocationContext.h>
|
#include <LibCore/DeferredInvocationContext.h>
|
||||||
#include <LibCore/Event.h>
|
#include <LibCore/Event.h>
|
||||||
#include <LibCore/Forward.h>
|
#include <LibCore/Forward.h>
|
||||||
|
#include <LibThreading/MutexProtected.h>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
|
|
||||||
namespace Core {
|
namespace Core {
|
||||||
|
|
||||||
|
static Threading::MutexProtected<EventLoop*> s_main_event_loop;
|
||||||
|
|
||||||
class EventLoop {
|
class EventLoop {
|
||||||
public:
|
public:
|
||||||
enum class MakeInspectable {
|
enum class MakeInspectable {
|
||||||
|
@ -48,7 +52,14 @@ public:
|
||||||
|
|
||||||
void post_event(Object& receiver, NonnullOwnPtr<Event>&&);
|
void post_event(Object& receiver, NonnullOwnPtr<Event>&&);
|
||||||
|
|
||||||
static EventLoop& main();
|
template<typename Callback>
|
||||||
|
static decltype(auto) with_main_locked(Callback callback)
|
||||||
|
{
|
||||||
|
return s_main_event_loop.with_locked([&callback](auto*& event_loop) {
|
||||||
|
VERIFY(event_loop != nullptr);
|
||||||
|
return callback(event_loop);
|
||||||
|
});
|
||||||
|
}
|
||||||
static EventLoop& current();
|
static EventLoop& current();
|
||||||
|
|
||||||
bool was_exit_requested() const { return m_exit_requested; }
|
bool was_exit_requested() const { return m_exit_requested; }
|
||||||
|
@ -111,7 +122,7 @@ private:
|
||||||
bool m_exit_requested { false };
|
bool m_exit_requested { false };
|
||||||
int m_exit_code { 0 };
|
int m_exit_code { 0 };
|
||||||
|
|
||||||
static int s_wake_pipe_fds[2];
|
static thread_local int s_wake_pipe_fds[2];
|
||||||
|
|
||||||
struct Private;
|
struct Private;
|
||||||
NonnullOwnPtr<Private> m_private;
|
NonnullOwnPtr<Private> m_private;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue