mirror of
				https://github.com/RGBCube/serenity
				synced 2025-10-31 14:52:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			132 lines
		
	
	
	
		
			3.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			132 lines
		
	
	
	
		
			3.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
 | |
|  *
 | |
|  * SPDX-License-Identifier: BSD-2-Clause
 | |
|  */
 | |
| 
 | |
| #include <AK/Vector.h>
 | |
| #include <LibCore/DeferredInvocationContext.h>
 | |
| #include <LibCore/EventLoopImplementation.h>
 | |
| #include <LibCore/EventReceiver.h>
 | |
| #include <LibCore/Promise.h>
 | |
| #include <LibCore/ThreadEventQueue.h>
 | |
| #include <LibThreading/Mutex.h>
 | |
| #include <errno.h>
 | |
| 
 | |
| namespace Core {
 | |
| 
 | |
| struct ThreadEventQueue::Private {
 | |
|     struct QueuedEvent {
 | |
|         AK_MAKE_NONCOPYABLE(QueuedEvent);
 | |
|         AK_MAKE_DEFAULT_MOVABLE(QueuedEvent);
 | |
| 
 | |
|     public:
 | |
|         QueuedEvent(EventReceiver& receiver, NonnullOwnPtr<Event> event)
 | |
|             : receiver(receiver)
 | |
|             , event(move(event))
 | |
|         {
 | |
|         }
 | |
| 
 | |
|         ~QueuedEvent() = default;
 | |
| 
 | |
|         WeakPtr<EventReceiver> receiver;
 | |
|         NonnullOwnPtr<Event> event;
 | |
|     };
 | |
| 
 | |
|     Threading::Mutex mutex;
 | |
|     Vector<QueuedEvent, 128> queued_events;
 | |
|     Vector<NonnullRefPtr<Promise<NonnullRefPtr<EventReceiver>>>, 16> pending_promises;
 | |
|     bool warned_promise_count { false };
 | |
| };
 | |
| 
 | |
| static thread_local ThreadEventQueue* s_current_thread_event_queue;
 | |
| 
 | |
| ThreadEventQueue& ThreadEventQueue::current()
 | |
| {
 | |
|     if (!s_current_thread_event_queue) {
 | |
|         // FIXME: Don't leak these.
 | |
|         s_current_thread_event_queue = new ThreadEventQueue;
 | |
|     }
 | |
|     return *s_current_thread_event_queue;
 | |
| }
 | |
| 
 | |
| ThreadEventQueue::ThreadEventQueue()
 | |
|     : m_private(make<Private>())
 | |
| {
 | |
| }
 | |
| 
 | |
| ThreadEventQueue::~ThreadEventQueue() = default;
 | |
| 
 | |
| void ThreadEventQueue::post_event(Core::EventReceiver& receiver, NonnullOwnPtr<Core::Event> event)
 | |
| {
 | |
|     {
 | |
|         Threading::MutexLocker lock(m_private->mutex);
 | |
|         m_private->queued_events.empend(receiver, move(event));
 | |
|     }
 | |
|     Core::EventLoopManager::the().did_post_event();
 | |
| }
 | |
| 
 | |
| void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<EventReceiver>>> promise)
 | |
| {
 | |
|     Threading::MutexLocker lock(m_private->mutex);
 | |
|     m_private->pending_promises.append(move(promise));
 | |
| }
 | |
| 
 | |
| void ThreadEventQueue::cancel_all_pending_jobs()
 | |
| {
 | |
|     Threading::MutexLocker lock(m_private->mutex);
 | |
|     for (auto const& promise : m_private->pending_promises)
 | |
|         promise->reject(Error::from_errno(ECANCELED));
 | |
| 
 | |
|     m_private->pending_promises.clear();
 | |
| }
 | |
| 
 | |
| size_t ThreadEventQueue::process()
 | |
| {
 | |
|     decltype(m_private->queued_events) events;
 | |
|     {
 | |
|         Threading::MutexLocker locker(m_private->mutex);
 | |
|         events = move(m_private->queued_events);
 | |
|         m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_rejected(); });
 | |
|     }
 | |
| 
 | |
|     size_t processed_events = 0;
 | |
|     for (size_t i = 0; i < events.size(); ++i) {
 | |
|         auto& queued_event = events.at(i);
 | |
|         auto receiver = queued_event.receiver.strong_ref();
 | |
|         auto& event = *queued_event.event;
 | |
| 
 | |
|         if (!receiver) {
 | |
|             switch (event.type()) {
 | |
|             case Event::Quit:
 | |
|                 VERIFY_NOT_REACHED();
 | |
|             default:
 | |
|                 // Receiver disappeared, drop the event on the floor.
 | |
|                 break;
 | |
|             }
 | |
|         } else if (event.type() == Event::Type::DeferredInvoke) {
 | |
|             static_cast<DeferredInvocationEvent&>(event).m_invokee();
 | |
|         } else {
 | |
|             NonnullRefPtr<EventReceiver> protector(*receiver);
 | |
|             receiver->dispatch_event(event);
 | |
|         }
 | |
|         ++processed_events;
 | |
|     }
 | |
| 
 | |
|     {
 | |
|         Threading::MutexLocker locker(m_private->mutex);
 | |
|         if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
 | |
|             m_private->warned_promise_count = true;
 | |
|             dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
 | |
|         }
 | |
|     }
 | |
|     return processed_events;
 | |
| }
 | |
| 
 | |
| bool ThreadEventQueue::has_pending_events() const
 | |
| {
 | |
|     Threading::MutexLocker locker(m_private->mutex);
 | |
|     return !m_private->queued_events.is_empty();
 | |
| }
 | |
| 
 | |
| }
 | 
