From 69c1910037e5ed21243307c073fdaa039713be6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?kleines=20Filmr=C3=B6llchen?= Date: Sun, 2 Jan 2022 14:52:38 +0100 Subject: [PATCH] LibCore: Allow EventLoops to run on multiple threads safely The event loop system was previously very singletony to the point that there's only a single event loop stack per process and only one event loop (the topmost) can run at a time. This commit simply makes the event loop stack and related structures thread-local so that each thread has an isolated event loop system. Some things are kept at a global level and synchronized with the new MutexProtected: The main event loop needs to still be obtainable from anywhere, as it closes down the application when it exits. The ID allocator is global as IDs should not be shared even between threads. And for the inspector server connection, the same as for the main loop holds. Note that currently, the wake pipe is only created by the main thread, so notifications don't work on other threads. This removes the temporary mutex fix for notifiers, introduced in 0631d3fed5623c1f2b0d6085ab24e4dd69c6ce99 . --- Tests/LibCore/TestLibCoreStream.cpp | 2 + .../SystemMonitor/ThreadStackWidget.cpp | 2 +- Userland/Libraries/LibCore/EventLoop.cpp | 122 +++++++++--------- Userland/Libraries/LibCore/EventLoop.h | 15 ++- 4 files changed, 80 insertions(+), 61 deletions(-) diff --git a/Tests/LibCore/TestLibCoreStream.cpp b/Tests/LibCore/TestLibCoreStream.cpp index ec9b35fa10..9447e79b85 100644 --- a/Tests/LibCore/TestLibCoreStream.cpp +++ b/Tests/LibCore/TestLibCoreStream.cpp @@ -325,6 +325,8 @@ TEST_CASE(local_socket_read) // connected. auto background_action = Threading::BackgroundAction::construct( [](auto&) { + Core::EventLoop event_loop; + auto maybe_client_socket = Core::Stream::LocalSocket::connect("/tmp/test-socket"); EXPECT(!maybe_client_socket.is_error()); auto client_socket = maybe_client_socket.release_value(); diff --git a/Userland/Applications/SystemMonitor/ThreadStackWidget.cpp b/Userland/Applications/SystemMonitor/ThreadStackWidget.cpp index 74dc0991c6..baeba6e377 100644 --- a/Userland/Applications/SystemMonitor/ThreadStackWidget.cpp +++ b/Userland/Applications/SystemMonitor/ThreadStackWidget.cpp @@ -121,7 +121,7 @@ void ThreadStackWidget::refresh() [weak_this = make_weak_ptr()](auto result) { if (!weak_this) return; - Core::EventLoop::main().post_event(const_cast(*weak_this), make(move(result))); + Core::EventLoop::with_main_locked([&](auto* main_event_loop) { main_event_loop->post_event(const_cast(*weak_this), make(move(result))); }); }); } diff --git a/Userland/Libraries/LibCore/EventLoop.cpp b/Userland/Libraries/LibCore/EventLoop.cpp index 66ce7efe87..a7804bddc9 100644 --- a/Userland/Libraries/LibCore/EventLoop.cpp +++ b/Userland/Libraries/LibCore/EventLoop.cpp @@ -1,5 +1,6 @@ /* * Copyright (c) 2018-2020, Andreas Kling + * Copyright (c) 2022, kleines Filmröllchen * * SPDX-License-Identifier: BSD-2-Clause */ @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -54,19 +56,20 @@ struct EventLoop::Private { Threading::Mutex lock; }; -static EventLoop* s_main_event_loop; -static Vector* s_event_loop_stack; -static NeverDestroyed s_id_allocator; -static HashMap>* s_timers; -static HashTable* s_notifiers; -static Threading::Mutex s_notifiers_mutex; +// The main event loop is global to the program, so it may be accessed from multiple threads. +// NOTE: s_main_event_loop is not declared here as it is needed in the header. +static Threading::MutexProtected> s_id_allocator; +static Threading::MutexProtected> s_inspector_server_connection; -int EventLoop::s_wake_pipe_fds[2]; -static RefPtr s_inspector_server_connection; +// Each thread has its own event loop stack, its own timers, notifiers and a wake pipe. +static thread_local Vector* s_event_loop_stack; +static thread_local HashMap>* s_timers; +static thread_local HashTable* s_notifiers; +thread_local int EventLoop::s_wake_pipe_fds[2]; bool EventLoop::has_been_instantiated() { - return s_main_event_loop; + return s_event_loop_stack != nullptr && !s_event_loop_stack->is_empty(); } class SignalHandlers : public RefCounted { @@ -130,7 +133,9 @@ class InspectorServerConnection : public Object { private: explicit InspectorServerConnection(RefPtr socket) : m_socket(move(socket)) - , m_client_id(s_id_allocator->allocate()) + , m_client_id(s_id_allocator.with_locked([](auto& allocator) { + return allocator->allocate(); + })) { #ifdef __serenity__ add_child(*m_socket); @@ -249,7 +254,7 @@ public: void shutdown() { - s_id_allocator->deallocate(m_client_id); + s_id_allocator.with_locked([this](auto& allocator) { allocator->deallocate(m_client_id); }); } private: @@ -266,30 +271,34 @@ EventLoop::EventLoop([[maybe_unused]] MakeInspectable make_inspectable) s_timers = new HashMap>; s_notifiers = new HashTable; } - - if (!s_main_event_loop) { - s_main_event_loop = this; - s_pid = getpid(); + s_main_event_loop.with_locked([&, this](auto*& main_event_loop) { + if (main_event_loop == nullptr) { + // FIXME: The compiler complains that we don't use main_event_loop although we set it. + main_event_loop = this; + s_pid = getpid(); + // FIXME: We only create the wake pipe for the main thread #if defined(SOCK_NONBLOCK) - int rc = pipe2(s_wake_pipe_fds, O_CLOEXEC); + int rc = pipe2(s_wake_pipe_fds, O_CLOEXEC); #else - int rc = pipe(s_wake_pipe_fds); - fcntl(s_wake_pipe_fds[0], F_SETFD, FD_CLOEXEC); - fcntl(s_wake_pipe_fds[1], F_SETFD, FD_CLOEXEC); + int rc = pipe(s_wake_pipe_fds); + fcntl(s_wake_pipe_fds[0], F_SETFD, FD_CLOEXEC); + fcntl(s_wake_pipe_fds[1], F_SETFD, FD_CLOEXEC); #endif - VERIFY(rc == 0); - s_event_loop_stack->append(*this); + VERIFY(rc == 0); + s_event_loop_stack->append(*this); #ifdef __serenity__ - if (getuid() != 0 - && make_inspectable == MakeInspectable::Yes - && !s_inspector_server_connection) { - if (!connect_to_inspector_server()) - dbgln("Core::EventLoop: Failed to connect to InspectorServer"); - } + if (getuid() != 0 + && make_inspectable == MakeInspectable::Yes + // FIXME: Deadlock potential; though the main loop and inspector server connection are rarely used in conjunction + && s_inspector_server_connection.with_locked([](auto inspector_server_connection) { return inspector_server_connection; })) { + if (!connect_to_inspector_server()) + dbgln("Core::EventLoop: Failed to connect to InspectorServer"); + } #endif - } + } + }); dbgln_if(EVENTLOOP_DEBUG, "{} Core::EventLoop constructed :)", getpid()); } @@ -297,10 +306,12 @@ EventLoop::EventLoop([[maybe_unused]] MakeInspectable make_inspectable) EventLoop::~EventLoop() { // NOTE: Pop the main event loop off of the stack when destroyed. - if (this == s_main_event_loop) { - s_event_loop_stack->take_last(); - s_main_event_loop = nullptr; - } + s_main_event_loop.with_locked([this](auto*& main_event_loop) { + if (this == main_event_loop) { + s_event_loop_stack->take_last(); + main_event_loop = nullptr; + } + }); } bool connect_to_inspector_server() @@ -309,19 +320,15 @@ bool connect_to_inspector_server() auto socket = Core::LocalSocket::construct(); if (!socket->connect(SocketAddress::local("/tmp/portal/inspectables"))) return false; - s_inspector_server_connection = InspectorServerConnection::construct(move(socket)); + s_inspector_server_connection.with_locked([&](auto& inspector_server_connection) { + inspector_server_connection = InspectorServerConnection::construct(move(socket)); + }); return true; #else VERIFY_NOT_REACHED(); #endif } -EventLoop& EventLoop::main() -{ - VERIFY(s_main_event_loop); - return *s_main_event_loop; -} - EventLoop& EventLoop::current() { return s_event_loop_stack->last(); @@ -346,20 +353,25 @@ public: EventLoopPusher(EventLoop& event_loop) : m_event_loop(event_loop) { - if (&m_event_loop != s_main_event_loop) { + if (!is_main_event_loop()) { m_event_loop.take_pending_events_from(EventLoop::current()); s_event_loop_stack->append(event_loop); } } ~EventLoopPusher() { - if (&m_event_loop != s_main_event_loop) { + if (!is_main_event_loop()) { s_event_loop_stack->take_last(); EventLoop::current().take_pending_events_from(m_event_loop); } } private: + bool is_main_event_loop() + { + return s_main_event_loop.with_locked([this](auto* main_event_loop) { return &m_event_loop == main_event_loop; }); + } + EventLoop& m_event_loop; }; @@ -571,7 +583,7 @@ void EventLoop::notify_forked(ForkEvent event) { switch (event) { case ForkEvent::Child: - s_main_event_loop = nullptr; + s_main_event_loop.with_locked([]([[maybe_unused]] auto*& main_event_loop) { main_event_loop = nullptr; }); s_event_loop_stack->clear(); s_timers->clear(); s_notifiers->clear(); @@ -581,7 +593,7 @@ void EventLoop::notify_forked(ForkEvent event) } s_pid = 0; #ifdef __serenity__ - s_inspector_server_connection = nullptr; + s_main_event_loop.with_locked([]([[maybe_unused]] auto*& main_event_loop) { main_event_loop = nullptr; }); #endif return; } @@ -608,16 +620,13 @@ retry: add_fd_to_set(s_wake_pipe_fds[0], rfds); max_fd = max(max_fd, max_fd_added); - { - Threading::MutexLocker locker(s_notifiers_mutex); - for (auto& notifier : *s_notifiers) { - if (notifier->event_mask() & Notifier::Read) - add_fd_to_set(notifier->fd(), rfds); - if (notifier->event_mask() & Notifier::Write) - add_fd_to_set(notifier->fd(), wfds); - if (notifier->event_mask() & Notifier::Exceptional) - VERIFY_NOT_REACHED(); - } + for (auto& notifier : *s_notifiers) { + if (notifier->event_mask() & Notifier::Read) + add_fd_to_set(notifier->fd(), rfds); + if (notifier->event_mask() & Notifier::Write) + add_fd_to_set(notifier->fd(), wfds); + if (notifier->event_mask() & Notifier::Exceptional) + VERIFY_NOT_REACHED(); } bool queued_events_is_empty; @@ -704,7 +713,6 @@ try_select_again: if (!marked_fd_count) return; - Threading::MutexLocker locker(s_notifiers_mutex); for (auto& notifier : *s_notifiers) { if (FD_ISSET(notifier->fd(), &rfds)) { if (notifier->event_mask() & Notifier::Event::Read) @@ -752,7 +760,7 @@ int EventLoop::register_timer(Object& object, int milliseconds, bool should_relo timer->reload(Time::now_monotonic_coarse()); timer->should_reload = should_reload; timer->fire_when_not_visible = fire_when_not_visible; - int timer_id = s_id_allocator->allocate(); + int timer_id = s_id_allocator.with_locked([](auto& allocator) { return allocator->allocate(); }); timer->timer_id = timer_id; s_timers->set(timer_id, move(timer)); return timer_id; @@ -760,7 +768,7 @@ int EventLoop::register_timer(Object& object, int milliseconds, bool should_relo bool EventLoop::unregister_timer(int timer_id) { - s_id_allocator->deallocate(timer_id); + s_id_allocator.with_locked([&](auto& allocator) { allocator->deallocate(timer_id); }); auto it = s_timers->find(timer_id); if (it == s_timers->end()) return false; @@ -770,13 +778,11 @@ bool EventLoop::unregister_timer(int timer_id) void EventLoop::register_notifier(Badge, Notifier& notifier) { - Threading::MutexLocker locker(s_notifiers_mutex); s_notifiers->set(¬ifier); } void EventLoop::unregister_notifier(Badge, Notifier& notifier) { - Threading::MutexLocker locker(s_notifiers_mutex); s_notifiers->remove(¬ifier); } diff --git a/Userland/Libraries/LibCore/EventLoop.h b/Userland/Libraries/LibCore/EventLoop.h index 672b8b3f2d..e2de367f48 100644 --- a/Userland/Libraries/LibCore/EventLoop.h +++ b/Userland/Libraries/LibCore/EventLoop.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2018-2020, Andreas Kling + * Copyright (c) 2022, kleines Filmröllchen * * SPDX-License-Identifier: BSD-2-Clause */ @@ -18,11 +19,14 @@ #include #include #include +#include #include #include namespace Core { +static Threading::MutexProtected s_main_event_loop; + class EventLoop { public: enum class MakeInspectable { @@ -48,7 +52,14 @@ public: void post_event(Object& receiver, NonnullOwnPtr&&); - static EventLoop& main(); + template + static decltype(auto) with_main_locked(Callback callback) + { + return s_main_event_loop.with_locked([&callback](auto*& event_loop) { + VERIFY(event_loop != nullptr); + return callback(event_loop); + }); + } static EventLoop& current(); bool was_exit_requested() const { return m_exit_requested; } @@ -111,7 +122,7 @@ private: bool m_exit_requested { false }; int m_exit_code { 0 }; - static int s_wake_pipe_fds[2]; + static thread_local int s_wake_pipe_fds[2]; struct Private; NonnullOwnPtr m_private;