1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-05-31 03:48:13 +00:00

Kernel: Fix some issues related to fixes and block conditions

Fix some problems with join blocks where the joining thread block
condition was added twice, which lead to a crash when trying to
unblock that condition a second time.

Deferred block condition evaluation by File objects were also not
properly keeping the File object alive, which lead to some random
crashes and corruption problems.

Other problems were caused by the fact that the Queued state didn't
handle signals/interruptions consistently. To solve these issues we
remove this state entirely, along with Thread::wait_on and change
the WaitQueue into a BlockCondition instead.

Also, deliver signals even if there isn't going to be a context switch
to another thread.

Fixes #4336 and #4330
This commit is contained in:
Tom 2020-12-07 21:29:41 -07:00 committed by Andreas Kling
parent 0918d8b1f8
commit da5cc34ebb
22 changed files with 474 additions and 434 deletions

View file

@ -45,6 +45,7 @@
#include <Kernel/Thread.h>
#include <Kernel/VM/MemoryManager.h>
#include <Kernel/VM/PageDirectory.h>
#include <Kernel/VM/ProcessPagingScope.h>
#include <LibC/mallocdefs.h>
//#define PAGE_FAULT_DEBUG
@ -1282,28 +1283,60 @@ const DescriptorTablePointer& Processor::get_gdtr()
return m_gdtr;
}
bool Processor::get_context_frame_ptr(Thread& thread, u32& frame_ptr, u32& eip, bool from_other_processor)
Vector<FlatPtr> Processor::capture_stack_trace(Thread& thread, size_t max_frames)
{
bool ret = true;
ScopedCritical critical;
auto& proc = Processor::current();
if (&thread == proc.current_thread()) {
ASSERT(thread.state() == Thread::Running);
asm volatile("movl %%ebp, %%eax"
: "=g"(frame_ptr));
FlatPtr frame_ptr = 0, eip = 0;
Vector<FlatPtr, 32> stack_trace;
auto walk_stack = [&](FlatPtr stack_ptr)
{
stack_trace.append(eip);
size_t count = 1;
while (stack_ptr) {
FlatPtr retaddr;
count++;
if (max_frames != 0 && count > max_frames)
break;
if (is_user_range(VirtualAddress(stack_ptr), sizeof(FlatPtr) * 2)) {
if (!copy_from_user(&retaddr, &((FlatPtr*)stack_ptr)[1]) || !retaddr)
break;
stack_trace.append(retaddr);
if (!copy_from_user(&stack_ptr, (FlatPtr*)stack_ptr))
break;
} else {
// If this triggered from another processor, we should never
// hit this code path because the other processor is still holding
// the scheduler lock, which should prevent us from switching
// contexts
ASSERT(!from_other_processor);
void* fault_at;
if (!safe_memcpy(&retaddr, &((FlatPtr*)stack_ptr)[1], sizeof(FlatPtr), fault_at) || !retaddr)
break;
stack_trace.append(retaddr);
if (!safe_memcpy(&stack_ptr, (FlatPtr*)stack_ptr, sizeof(FlatPtr), fault_at))
break;
}
}
};
auto capture_current_thread = [&]()
{
frame_ptr = (FlatPtr)__builtin_frame_address(0);
eip = (FlatPtr)__builtin_return_address(0);
walk_stack(frame_ptr);
};
// Since the thread may be running on another processor, there
// is a chance a context switch may happen while we're trying
// to get it. It also won't be entirely accurate and merely
// reflect the status at the last context switch.
ScopedSpinLock lock(g_scheduler_lock);
if (thread.state() == Thread::Running) {
auto& proc = Processor::current();
if (&thread == proc.current_thread()) {
ASSERT(thread.state() == Thread::Running);
// Leave the scheduler lock. If we trigger page faults we may
// need to be preempted. Since this is our own thread it won't
// cause any problems as the stack won't change below this frame.
lock.unlock();
capture_current_thread();
} else if (thread.is_active()) {
ASSERT(thread.cpu() != proc.id());
// If this is the case, the thread is currently running
// on another processor. We can't trust the kernel stack as
@ -1313,23 +1346,57 @@ bool Processor::get_context_frame_ptr(Thread& thread, u32& frame_ptr, u32& eip,
smp_unicast(thread.cpu(),
[&]() {
dbg() << "CPU[" << Processor::current().id() << "] getting stack for cpu #" << proc.id();
// NOTE: Because we are holding the scheduler lock while
// waiting for this callback to finish, the current thread
// on the target processor cannot change
ret = get_context_frame_ptr(thread, frame_ptr, eip, true);
ProcessPagingScope paging_scope(thread.process());
auto& target_proc = Processor::current();
ASSERT(&target_proc != &proc);
ASSERT(&thread == target_proc.current_thread());
// NOTE: Because the other processor is still holding the
// scheduler lock while waiting for this callback to finish,
// the current thread on the target processor cannot change
// TODO: What to do about page faults here? We might deadlock
// because the other processor is still holding the
// scheduler lock...
capture_current_thread();
}, false);
} else {
switch (thread.state()) {
case Thread::Running:
ASSERT_NOT_REACHED(); // should have been handled above
case Thread::Runnable:
case Thread::Stopped:
case Thread::Blocked:
case Thread::Dying:
case Thread::Dead: {
// We need to retrieve ebp from what was last pushed to the kernel
// stack. Before switching out of that thread, it switch_context
// pushed the callee-saved registers, and the last of them happens
// to be ebp.
ProcessPagingScope paging_scope(thread.process());
auto& tss = thread.tss();
u32* stack_top = reinterpret_cast<u32*>(tss.esp);
frame_ptr = stack_top[0];
if (is_user_range(VirtualAddress(stack_top), sizeof(FlatPtr))) {
if (!copy_from_user(&frame_ptr, &((FlatPtr*)stack_top)[0]))
frame_ptr = 0;
} else {
void* fault_at;
if (!safe_memcpy(&frame_ptr, &((FlatPtr*)stack_top)[0], sizeof(FlatPtr), fault_at))
frame_ptr = 0;
}
eip = tss.eip;
// TODO: We need to leave the scheduler lock here, but we also
// need to prevent the target thread from being run while
// we walk the stack
lock.unlock();
walk_stack(frame_ptr);
break;
}
default:
dbg() << "Cannot capture stack trace for thread " << thread << " in state " << thread.state_string();
break;
}
}
return true;
return stack_trace;
}
extern "C" void enter_thread_context(Thread* from_thread, Thread* to_thread)
@ -1435,7 +1502,7 @@ extern "C" void context_first_init(Thread* from_thread, Thread* to_thread, TrapF
ASSERT(to_thread == Thread::current());
Scheduler::enter_current(*from_thread);
Scheduler::enter_current(*from_thread, true);
// Since we got here and don't have Scheduler::context_switch in the
// call stack (because this is the first time we switched into this

View file

@ -1018,7 +1018,7 @@ public:
void switch_context(Thread*& from_thread, Thread*& to_thread);
[[noreturn]] static void assume_context(Thread& thread, u32 flags);
u32 init_context(Thread& thread, bool leave_crit);
static bool get_context_frame_ptr(Thread& thread, u32& frame_ptr, u32& eip, bool = false);
static Vector<FlatPtr> capture_stack_trace(Thread& thread, size_t max_frames = 0);
void set_thread_specific(u8* data, size_t len);
};

View file

@ -74,7 +74,7 @@ auto AsyncDeviceRequest::wait(timeval* timeout) -> RequestWaitResult
auto request_result = get_request_result();
if (is_completed_result(request_result))
return { request_result, Thread::BlockResult::NotBlocked };
auto wait_result = Thread::current()->wait_on(m_queue, name(), Thread::BlockTimeout(false, timeout));
auto wait_result = m_queue.wait_on(Thread::BlockTimeout(false, timeout), name());
return { get_request_result(), wait_result };
}

View file

@ -227,7 +227,7 @@ void SB16::handle_irq(const RegisterState&)
void SB16::wait_for_irq()
{
Thread::current()->wait_on(m_irq_queue, "SB16");
m_irq_queue.wait_on(nullptr, "SB16");
disable_irq();
}

View file

@ -71,7 +71,7 @@ NonnullRefPtr<FileDescription> FIFO::open_direction_blocking(FIFO::Direction dir
if (m_writers == 0) {
locker.unlock();
Thread::current()->wait_on(m_write_open_queue, "FIFO");
m_write_open_queue.wait_on(nullptr, "FIFO");
locker.lock();
}
}
@ -81,7 +81,7 @@ NonnullRefPtr<FileDescription> FIFO::open_direction_blocking(FIFO::Direction dir
if (m_readers == 0) {
locker.unlock();
Thread::current()->wait_on(m_read_open_queue, "FIFO");
m_read_open_queue.wait_on(nullptr, "FIFO");
locker.lock();
}
}

View file

@ -142,17 +142,24 @@ protected:
{
if (Processor::current().in_irq()) {
// If called from an IRQ handler we need to delay evaluation
// and unblocking of waiting threads
Processor::deferred_call_queue([this]() {
ASSERT(!Processor::current().in_irq());
evaluate_block_conditions();
// and unblocking of waiting threads. Note that this File
// instance may be deleted until the deferred call is executed!
Processor::deferred_call_queue([self = make_weak_ptr()]() {
if (auto file = self.strong_ref())
file->do_evaluate_block_conditions();
});
} else {
block_condition().unblock();
do_evaluate_block_conditions();
}
}
private:
ALWAYS_INLINE void do_evaluate_block_conditions()
{
ASSERT(!Processor::current().in_irq());
block_condition().unblock();
}
FileBlockCondition m_block_condition;
};

View file

@ -78,7 +78,8 @@ void Lock::lock(Mode mode)
m_lock.store(false, AK::memory_order_release);
return;
}
} while (current_thread->wait_on(m_queue, m_name, nullptr, &m_lock, m_holder) == Thread::BlockResult::NotBlocked);
m_lock.store(false, AK::memory_order_release);
} while (m_queue.wait_on(nullptr, m_name) == Thread::BlockResult::NotBlocked);
} else {
// I don't know *who* is using "m_lock", so just yield.
Scheduler::yield_from_critical();
@ -114,7 +115,8 @@ void Lock::unlock()
return;
}
m_mode = Mode::Unlocked;
m_queue.wake_one(&m_lock);
m_lock.store(false, AK::memory_order_release);
m_queue.wake_one();
return;
}
// I don't know *who* is using "m_lock", so just yield.
@ -142,7 +144,8 @@ bool Lock::force_unlock_if_locked()
m_holder = nullptr;
m_mode = Mode::Unlocked;
m_times_locked = 0;
m_queue.wake_one(&m_lock);
m_lock.store(false, AK::memory_order_release);
m_queue.wake_one();
break;
}
// I don't know *who* is using "m_lock", so just yield.
@ -154,8 +157,7 @@ bool Lock::force_unlock_if_locked()
void Lock::clear_waiters()
{
ASSERT(m_mode != Mode::Shared);
ScopedCritical critical;
m_queue.clear();
m_queue.wake_all();
}
}

View file

@ -419,7 +419,7 @@ void E1000NetworkAdapter::send_raw(ReadonlyBytes payload)
sti();
break;
}
Thread::current()->wait_on(m_wait_queue, "E1000NetworkAdapter");
m_wait_queue.wait_on(nullptr, "E1000NetworkAdapter");
}
#ifdef E1000_DEBUG
klog() << "E1000: Sent packet, status is now " << String::format("%b", descriptor.status) << "!";

View file

@ -114,7 +114,7 @@ void NetworkTask_main(void*)
for (;;) {
size_t packet_size = dequeue_packet(buffer, buffer_size, packet_timestamp);
if (!packet_size) {
Thread::current()->wait_on(packet_wait_queue, "NetworkTask");
packet_wait_queue.wait_on(nullptr, "NetworkTask");
continue;
}
if (packet_size < sizeof(EthernetFrameHeader)) {

View file

@ -750,6 +750,7 @@ void Process::terminate_due_to_signal(u8 signal)
{
ASSERT_INTERRUPTS_DISABLED();
ASSERT(signal < 32);
ASSERT(Process::current() == this);
dbg() << "Terminating " << *this << " due to signal " << signal;
m_termination_status = 0;
m_termination_signal = signal;

View file

@ -69,7 +69,7 @@ KernelRng::KernelRng()
void KernelRng::wait_for_entropy()
{
if (!resource().is_ready()) {
Thread::current()->wait_on(m_seed_queue, "KernelRng");
m_seed_queue.wait_on(nullptr, "KernelRng");
}
}

View file

@ -140,9 +140,7 @@ bool Scheduler::pick_next()
#ifdef SCHEDULER_RUNNABLE_DEBUG
dbg() << "Scheduler[" << Processor::current().id() << "]: Non-runnables:";
Scheduler::for_each_nonrunnable([&](Thread& thread) -> IterationDecision {
if (thread.state() == Thread::Queued)
dbg() << " " << String::format("%-12s", thread.state_string()) << " " << thread << " @ " << String::format("%w", thread.tss().cs) << ":" << String::format("%x", thread.tss().eip) << " Reason: " << (thread.wait_reason() ? thread.wait_reason() : "none");
else if (thread.state() == Thread::Dying)
if (thread.state() == Thread::Dying)
dbg() << " " << String::format("%-12s", thread.state_string()) << " " << thread << " @ " << String::format("%w", thread.tss().cs) << ":" << String::format("%x", thread.tss().eip) << " Finalizable: " << thread.is_finalizable();
else
dbg() << " " << String::format("%-12s", thread.state_string()) << " " << thread << " @ " << String::format("%w", thread.tss().cs) << ":" << String::format("%x", thread.tss().eip);
@ -324,14 +322,6 @@ bool Scheduler::context_switch(Thread* thread)
thread->did_schedule();
auto from_thread = Thread::current();
// Check if we have any signals we should deliver (even if we don't
// end up switching to another thread)
if (from_thread && from_thread->state() == Thread::Running && from_thread->pending_signals_for_state()) {
ScopedSpinLock lock(from_thread->get_lock());
from_thread->dispatch_one_pending_signal();
}
if (from_thread == thread)
return false;
@ -364,21 +354,31 @@ bool Scheduler::context_switch(Thread* thread)
// NOTE: from_thread at this point reflects the thread we were
// switched from, and thread reflects Thread::current()
enter_current(*from_thread);
enter_current(*from_thread, false);
ASSERT(thread == Thread::current());
return true;
}
void Scheduler::enter_current(Thread& prev_thread)
void Scheduler::enter_current(Thread& prev_thread, bool is_first)
{
ASSERT(g_scheduler_lock.is_locked());
ASSERT(g_scheduler_lock.own_lock());
prev_thread.set_active(false);
if (prev_thread.state() == Thread::Dying) {
// If the thread we switched from is marked as dying, then notify
// the finalizer. Note that as soon as we leave the scheduler lock
// the finalizer may free from_thread!
notify_finalizer();
} else if (!is_first) {
// Check if we have any signals we should deliver (even if we don't
// end up switching to another thread).
auto current_thread = Thread::current();
if (!current_thread->is_in_block()) {
ScopedSpinLock lock(current_thread->get_lock());
if (current_thread->state() == Thread::Running && current_thread->pending_signals_for_state()) {
current_thread->dispatch_one_pending_signal();
}
}
}
}

View file

@ -61,7 +61,7 @@ public:
static bool donate_to_and_switch(Thread*, const char* reason);
static bool donate_to(RefPtr<Thread>&, const char* reason);
static bool context_switch(Thread*);
static void enter_current(Thread& prev_thread);
static void enter_current(Thread& prev_thread, bool is_first);
static void leave_on_first_switch(u32 flags);
static void prepare_after_exec();
static void prepare_for_idle_loop();

View file

@ -143,6 +143,8 @@ void syscall_handler(TrapFrame* trap)
current_thread->tracer_trap(regs); // this triggers SIGTRAP and stops the thread!
}
current_thread->yield_if_stopped();
// Make sure SMAP protection is enabled on syscall entry.
clac();
@ -185,12 +187,16 @@ void syscall_handler(TrapFrame* trap)
u32 arg3 = regs.ebx;
regs.eax = Syscall::handle(regs, function, arg1, arg2, arg3);
process.big_lock().unlock();
if (auto* tracer = current_thread->tracer(); tracer && tracer->is_tracing_syscalls()) {
tracer->set_trace_syscalls(false);
current_thread->tracer_trap(regs); // this triggers SIGTRAP and stops the thread!
}
process.big_lock().unlock();
current_thread->yield_if_stopped();
current_thread->check_dispatch_pending_signal();
// Check if we're supposed to return to userspace or just die.
current_thread->die_if_needed();

View file

@ -61,9 +61,8 @@ int Process::sys$futex(Userspace<const Syscall::SC_futex_params*> user_params)
timeout = Thread::BlockTimeout(true, &ts_abstimeout);
}
// FIXME: This is supposed to be interruptible by a signal, but right now WaitQueue cannot be interrupted.
WaitQueue& wait_queue = futex_queue((FlatPtr)params.userspace_address);
Thread::BlockResult result = Thread::current()->wait_on(wait_queue, "Futex", timeout);
Thread::BlockResult result = wait_queue.wait_on(timeout, "Futex");
if (result == Thread::BlockResult::InterruptedByTimeout) {
return -ETIMEDOUT;
}

View file

@ -36,7 +36,7 @@ void FinalizerTask::spawn()
finalizer_thread, "FinalizerTask", [](void*) {
Thread::current()->set_priority(THREAD_PRIORITY_LOW);
for (;;) {
Thread::current()->wait_on(*g_finalizer_wait_queue, "FinalizerTask");
g_finalizer_wait_queue->wait_on(nullptr, "FinalizerTask");
if (g_finalizer_has_work.exchange(false, AK::MemoryOrder::memory_order_acq_rel) == true)
Thread::finalize_dying_threads();

View file

@ -130,7 +130,7 @@ Thread::~Thread()
void Thread::unblock_from_blocker(Blocker& blocker)
{
ScopedSpinLock scheduler_lock(g_scheduler_lock);
ScopedSpinLock lock(m_lock);
ScopedSpinLock block_lock(m_block_lock);
if (m_blocker != &blocker)
return;
if (!is_stopped())
@ -140,7 +140,7 @@ void Thread::unblock_from_blocker(Blocker& blocker)
void Thread::unblock(u8 signal)
{
ASSERT(g_scheduler_lock.own_lock());
ASSERT(m_lock.own_lock());
ASSERT(m_block_lock.own_lock());
if (m_state != Thread::Blocked)
return;
ASSERT(m_blocker);
@ -167,7 +167,6 @@ void Thread::set_should_die()
// Remember that we should die instead of returning to
// the userspace.
{
ScopedSpinLock lock(g_scheduler_lock);
m_should_die = true;
@ -180,25 +179,16 @@ void Thread::set_should_die()
// the kernel stacks can clean up. We won't ever return back
// to user mode, though
resume_from_stopped();
} else if (state() == Queued) {
// m_queue can only be accessed safely if g_scheduler_lock is held!
if (m_queue) {
m_queue->dequeue(*this);
m_queue = nullptr;
// Wake the thread
wake_from_queue();
}
}
}
if (is_blocked()) {
ScopedSpinLock lock(m_lock);
ASSERT(m_blocker != nullptr);
ScopedSpinLock block_lock(m_block_lock);
if (m_blocker) {
// We're blocked in the kernel.
m_blocker->set_interrupted_by_death();
unblock();
}
}
}
void Thread::die_if_needed()
{
@ -222,7 +212,7 @@ void Thread::die_if_needed()
// actual context switch
u32 prev_flags;
Processor::current().clear_critical(prev_flags, false);
dbg() << "die_if_needed returned form clear_critical!!! in irq: " << Processor::current().in_irq();
dbg() << "die_if_needed returned from clear_critical!!! in irq: " << Processor::current().in_irq();
// We should never get here, but the scoped scheduler lock
// will be released by Scheduler::context_switch again
ASSERT_NOT_REACHED();
@ -237,6 +227,16 @@ void Thread::exit(void* exit_value)
die_if_needed();
}
void Thread::yield_while_not_holding_big_lock()
{
ASSERT(!g_scheduler_lock.own_lock());
u32 prev_flags;
u32 prev_crit = Processor::current().clear_critical(prev_flags, true);
Scheduler::yield();
// NOTE: We may be on a different CPU now!
Processor::current().restore_critical(prev_crit, prev_flags);
}
void Thread::yield_without_holding_big_lock()
{
ASSERT(!g_scheduler_lock.own_lock());
@ -298,10 +298,8 @@ const char* Thread::state_string() const
return "Dead";
case Thread::Stopped:
return "Stopped";
case Thread::Queued:
return "Queued";
case Thread::Blocked: {
ScopedSpinLock lock(m_lock);
ScopedSpinLock block_lock(m_block_lock);
ASSERT(m_blocker != nullptr);
return m_blocker->state_string();
}
@ -382,6 +380,29 @@ bool Thread::tick()
return --m_ticks_left;
}
void Thread::check_dispatch_pending_signal()
{
auto result = DispatchSignalResult::Continue;
{
ScopedSpinLock scheduler_lock(g_scheduler_lock);
if (pending_signals_for_state()) {
ScopedSpinLock lock(m_lock);
result = dispatch_one_pending_signal();
}
}
switch (result) {
case DispatchSignalResult::Yield:
yield_while_not_holding_big_lock();
break;
case DispatchSignalResult::Terminate:
process().die();
break;
default:
break;
}
}
bool Thread::has_pending_signal(u8 signal) const
{
ScopedSpinLock lock(g_scheduler_lock);
@ -424,11 +445,19 @@ void Thread::send_signal(u8 signal, [[maybe_unused]] Process* sender)
m_pending_signals |= 1 << (signal - 1);
m_have_any_unmasked_pending_signals.store(pending_signals_for_state() & ~m_signal_mask, AK::memory_order_release);
ScopedSpinLock lock(m_lock);
if (m_state == Stopped) {
if (pending_signals_for_state())
ScopedSpinLock lock(m_lock);
if (pending_signals_for_state()) {
#ifdef SIGNAL_DEBUG
dbg() << "Signal: Resuming stopped " << *this << " to deliver signal " << signal;
#endif
resume_from_stopped();
}
} else {
ScopedSpinLock block_lock(m_block_lock);
#ifdef SIGNAL_DEBUG
dbg() << "Signal: Unblocking " << *this << " to deliver signal " << signal;
#endif
unblock(signal);
}
}
@ -607,7 +636,7 @@ DispatchSignalResult Thread::dispatch_signal(u8 signal)
ASSERT(this == Thread::current());
#ifdef SIGNAL_DEBUG
klog() << "signal: dispatch signal " << signal << " to " << *this;
klog() << "signal: dispatch signal " << signal << " to " << *this << " state: " << state_string();
#endif
if (m_state == Invalid || !is_initialized()) {
@ -618,12 +647,18 @@ DispatchSignalResult Thread::dispatch_signal(u8 signal)
return DispatchSignalResult::Deferred;
}
if (is_stopped() && signal != SIGCONT && signal != SIGKILL && signal != SIGTRAP) {
#ifdef SIGNAL_DEBUG
klog() << "signal: " << *this << " is stopped, will handle signal " << signal << " when resumed";
#endif
return DispatchSignalResult::Deferred;
}
// if (is_stopped() && signal != SIGCONT && signal != SIGKILL && signal != SIGTRAP) {
//#ifdef SIGNAL_DEBUG
// klog() << "signal: " << *this << " is stopped, will handle signal " << signal << " when resumed";
//#endif
// return DispatchSignalResult::Deferred;
// }
// if (is_blocked()) {
//#ifdef SIGNAL_DEBUG
// klog() << "signal: " << *this << " is blocked, will handle signal " << signal << " when unblocking";
//#endif
// return DispatchSignalResult::Deferred;
// }
auto& action = m_signal_action_data[signal];
// FIXME: Implement SA_SIGINFO signal handlers.
@ -635,21 +670,18 @@ DispatchSignalResult Thread::dispatch_signal(u8 signal)
auto* thread_tracer = tracer();
if (signal == SIGSTOP || (thread_tracer && default_signal_action(signal) == DefaultSignalAction::DumpCore)) {
if (!is_stopped()) {
#ifdef SIGNAL_DEBUG
dbg() << "signal: signal " << signal << " stopping thread " << *this;
#endif
m_stop_signal = signal;
set_state(State::Stopped);
}
return DispatchSignalResult::Yield;
}
if (signal == SIGCONT && is_stopped()) {
if (signal == SIGCONT) {
#ifdef SIGNAL_DEBUG
dbg() << "signal: SIGCONT resuming " << *this << " from stopped";
dbg() << "signal: SIGCONT resuming " << *this;
#endif
resume_from_stopped();
} else {
if (thread_tracer != nullptr) {
// when a thread is traced, it should be stopped whenever it receives a signal
@ -873,13 +905,14 @@ void Thread::set_state(State new_state)
if (new_state == Blocked) {
// we should always have a Blocker while blocked
ScopedSpinLock block_lock(m_block_lock);
ASSERT(m_blocker != nullptr);
}
auto previous_state = m_state;
ScopedSpinLock thread_lock(m_lock);
if (previous_state == Invalid) {
// If we were *just* created, we may have already pending signals
ScopedSpinLock thread_lock(m_lock);
if (has_unmasked_pending_signals()) {
dbg() << "Dispatch pending signals to new thread " << *this;
dispatch_one_pending_signal();
@ -890,6 +923,7 @@ void Thread::set_state(State new_state)
#ifdef THREAD_DEBUG
dbg() << "Set Thread " << *this << " state to " << state_string();
#endif
thread_lock.unlock();
if (m_process->pid() != 0) {
update_state_for_thread(previous_state);
@ -906,7 +940,7 @@ void Thread::set_state(State new_state)
m_stop_state = previous_state != Running ? m_state : Runnable;
process().unblock_waiters(*this, Thread::WaitBlocker::UnblockFlags::Stopped, m_stop_signal);
} else if (m_state == Dying) {
ASSERT(previous_state != Queued);
ASSERT(previous_state != Blocked);
if (this != Thread::current() && is_finalizable()) {
// Some other thread set this thread to Dying, notify the
// finalizer right away as it can be cleaned up now
@ -979,34 +1013,13 @@ String Thread::backtrace_impl()
// If we're handling IRQs we can't really safely symbolicate
elf_bundle = process.elf_bundle();
}
auto stack_trace = Processor::capture_stack_trace(*this);
ProcessPagingScope paging_scope(process);
// To prevent a context switch involving this thread, which may happen
// on another processor, we need to acquire the scheduler lock while
// walking the stack
{
ScopedSpinLock lock(g_scheduler_lock);
FlatPtr stack_ptr, eip;
if (Processor::get_context_frame_ptr(*this, stack_ptr, eip)) {
recognized_symbols.append({ eip, symbolicate_kernel_address(eip) });
while (stack_ptr) {
FlatPtr retaddr;
if (is_user_range(VirtualAddress(stack_ptr), sizeof(FlatPtr) * 2)) {
if (!copy_from_user(&retaddr, &((FlatPtr*)stack_ptr)[1]))
break;
recognized_symbols.append({ retaddr, symbolicate_kernel_address(retaddr) });
if (!copy_from_user(&stack_ptr, (FlatPtr*)stack_ptr))
break;
for (auto& frame : stack_trace) {
if (is_user_range(VirtualAddress(frame), sizeof(FlatPtr) * 2)) {
recognized_symbols.append({ frame, symbolicate_kernel_address(frame) });
} else {
void* fault_at;
if (!safe_memcpy(&retaddr, &((FlatPtr*)stack_ptr)[1], sizeof(FlatPtr), fault_at))
break;
recognized_symbols.append({ retaddr, symbolicate_kernel_address(retaddr) });
if (!safe_memcpy(&stack_ptr, (FlatPtr*)stack_ptr, sizeof(FlatPtr), fault_at))
break;
}
}
recognized_symbols.append({ frame, symbolicate_kernel_address(frame) });
}
}
@ -1064,120 +1077,6 @@ const LogStream& operator<<(const LogStream& stream, const Thread& value)
return stream << value.process().name() << "(" << value.pid().value() << ":" << value.tid().value() << ")";
}
Thread::BlockResult Thread::wait_on(WaitQueue& queue, const char* reason, const BlockTimeout& timeout, Atomic<bool>* lock, RefPtr<Thread> beneficiary)
{
auto* current_thread = Thread::current();
RefPtr<Timer> timer;
bool block_finished = false;
bool did_timeout = false;
bool did_unlock;
{
ScopedCritical critical;
// We need to be in a critical section *and* then also acquire the
// scheduler lock. The only way acquiring the scheduler lock could
// block us is if another core were to be holding it, in which case
// we need to wait until the scheduler lock is released again
{
ScopedSpinLock sched_lock(g_scheduler_lock);
if (!timeout.is_infinite()) {
timer = TimerQueue::the().add_timer_without_id(timeout.clock_id(), timeout.absolute_time(), [&]() {
// NOTE: this may execute on the same or any other processor!
ScopedSpinLock lock(g_scheduler_lock);
if (!block_finished) {
did_timeout = true;
wake_from_queue();
}
});
if (!timer) {
if (lock)
*lock = false;
// We timed out already, don't block
return BlockResult::InterruptedByTimeout;
}
}
// m_queue can only be accessed safely if g_scheduler_lock is held!
m_queue = &queue;
if (!queue.enqueue(*current_thread)) {
// The WaitQueue was already requested to wake someone when
// nobody was waiting. So return right away as we shouldn't
// be waiting
// NOTE: Do not set lock to false in this case!
return BlockResult::NotBlocked;
}
if (lock)
*lock = false;
did_unlock = unlock_process_if_locked();
set_state(State::Queued);
m_wait_reason = reason;
// Yield and wait for the queue to wake us up again.
if (beneficiary)
Scheduler::donate_to(beneficiary, reason);
else
Scheduler::yield();
}
// We've unblocked, relock the process if needed and carry on.
relock_process(did_unlock);
// This looks counter productive, but we may not actually leave
// the critical section we just restored. It depends on whether
// we were in one while being called.
if (current_thread->should_die()) {
// We're being unblocked so that we can clean up. We shouldn't
// be in Dying state until we're about to return back to user mode
ASSERT(current_thread->state() == Thread::Running);
#ifdef THREAD_DEBUG
dbg() << "Dying thread " << *current_thread << " was unblocked";
#endif
}
}
BlockResult result(BlockResult::WokeNormally);
{
// To be able to look at m_wait_queue_node we once again need the
// scheduler lock, which is held when we insert into the queue
ScopedSpinLock sched_lock(g_scheduler_lock);
block_finished = true;
if (m_queue) {
ASSERT(m_queue == &queue);
// If our thread was still in the queue, we timed out
m_queue = nullptr;
if (queue.dequeue(*current_thread))
result = BlockResult::InterruptedByTimeout;
} else {
// Our thread was already removed from the queue. The only
// way this can happen if someone else is trying to kill us.
// In this case, the queue should not contain us anymore.
result = BlockResult::InterruptedByDeath;
}
}
if (timer && !did_timeout) {
// Cancel the timer while not holding any locks. This allows
// the timer function to complete before we remove it
// (e.g. if it's on another processor)
TimerQueue::the().cancel_timer(timer.release_nonnull());
}
return result;
}
void Thread::wake_from_queue()
{
ScopedSpinLock lock(g_scheduler_lock);
ASSERT(state() == State::Queued);
m_wait_reason = nullptr;
if (this != Thread::current())
set_state(State::Runnable);
else
set_state(State::Running);
}
RefPtr<Thread> Thread::from_tid(ThreadID tid)
{
RefPtr<Thread> found_thread;

View file

@ -151,8 +151,7 @@ public:
Dying,
Dead,
Stopped,
Blocked,
Queued,
Blocked
};
class BlockResult {
@ -263,6 +262,7 @@ public:
File,
Plan9FS,
Join,
Queue,
Routing,
Sleep,
Wait
@ -418,21 +418,48 @@ public:
}
template<typename UnblockOne>
void unblock_all(UnblockOne unblock_one)
bool unblock_some(UnblockOne unblock_one)
{
ScopedSpinLock lock(m_lock);
do_unblock_all(unblock_one);
return do_unblock_some(unblock_one);
}
template<typename UnblockOne>
void do_unblock_all(UnblockOne unblock_one)
bool do_unblock_some(UnblockOne unblock_one)
{
ASSERT(m_lock.is_locked());
bool stop_iterating = false;
for (size_t i = 0; i < m_blockers.size() && !stop_iterating;) {
auto& info = m_blockers[i];
if (unblock_one(*info.blocker, info.data, stop_iterating)) {
m_blockers.remove(i);
continue;
}
i++;
}
return !stop_iterating;
}
template<typename UnblockOne>
bool unblock_all(UnblockOne unblock_one)
{
ScopedSpinLock lock(m_lock);
return do_unblock_all(unblock_one);
}
template<typename UnblockOne>
bool do_unblock_all(UnblockOne unblock_one)
{
ASSERT(m_lock.is_locked());
bool unblocked_any = false;
for (auto& info : m_blockers) {
bool did_unblock = unblock_one(*info.blocker, info.data);
unblocked_any |= did_unblock;
ASSERT(did_unblock);
}
m_blockers.clear();
return unblocked_any;
}
virtual bool should_add_blocker(Blocker&, void*) { return true; }
@ -466,6 +493,28 @@ public:
bool m_should_block { true };
};
class QueueBlocker : public Blocker {
public:
explicit QueueBlocker(WaitQueue&, const char* block_reason = nullptr);
virtual ~QueueBlocker();
virtual Type blocker_type() const override { return Type::Queue; }
virtual const char* state_string() const override { return m_block_reason ? m_block_reason : "Queue"; }
virtual void not_blocking(bool) override { }
virtual bool should_block() override
{
return m_should_block;
}
bool unblock();
protected:
const char* const m_block_reason;
bool m_should_block { true };
bool m_did_unblock { false };
};
class FileBlocker : public Blocker {
public:
enum class BlockFlags : u32 {
@ -587,7 +636,6 @@ public:
size_t collect_unblocked_flags();
FDVector& m_fds;
size_t m_registered_count { 0 };
bool m_did_unblock { false };
};
@ -660,7 +708,8 @@ public:
bool m_finalized { false };
};
KResult try_join(JoinBlocker& blocker)
template<typename AddBlockerHandler>
KResult try_join(AddBlockerHandler add_blocker)
{
if (Thread::current() == this)
return KResult(-EDEADLK);
@ -669,8 +718,7 @@ public:
if (!m_is_joinable || state() == Dead)
return KResult(-EINVAL);
bool added = m_join_condition.add_blocker(blocker, nullptr);
ASSERT(added);
add_blocker();
// From this point on the thread is no longer joinable by anyone
// else. It also means that if the join is timed, it becomes
@ -686,10 +734,10 @@ public:
bool is_stopped() const { return m_state == Stopped; }
bool is_blocked() const { return m_state == Blocked; }
bool has_blocker() const
bool is_in_block() const
{
ASSERT(m_lock.own_lock());
return m_blocker != nullptr;
ScopedSpinLock lock(m_block_lock);
return m_in_block;
}
const Blocker& blocker() const;
@ -711,49 +759,85 @@ public:
VirtualAddress thread_specific_data() const { return m_thread_specific_data; }
size_t thread_specific_region_size() const { return m_thread_specific_region_size; }
ALWAYS_INLINE void yield_if_stopped()
{
// If some thread stopped us, we need to yield to someone else
// We check this when entering/exiting a system call. A thread
// may continue to execute in user land until the next timer
// tick or entering the next system call, or if it's in kernel
// mode then we will intercept prior to returning back to user
// mode.
ScopedSpinLock lock(m_lock);
while (state() == Thread::Stopped) {
lock.unlock();
// We shouldn't be holding the big lock here
yield_while_not_holding_big_lock();
lock.lock();
}
}
template<typename T, class... Args>
[[nodiscard]] BlockResult block(const BlockTimeout& timeout, Args&&... args)
{
ScopedSpinLock scheduler_lock(g_scheduler_lock);
ScopedSpinLock lock(m_lock);
// We need to hold m_lock so that nobody can unblock a blocker as soon
ScopedSpinLock block_lock(m_block_lock);
// We need to hold m_block_lock so that nobody can unblock a blocker as soon
// as it is constructed and registered elsewhere
ASSERT(!m_in_block);
m_in_block = true;
T t(forward<Args>(args)...);
bool did_timeout = false;
RefPtr<Timer> timer;
{
// We should never be blocking a blocked (or otherwise non-active) thread.
ASSERT(state() == Thread::Running);
switch (state()) {
case Thread::Stopped:
// It's possible that we were requested to be stopped!
break;
case Thread::Running:
ASSERT(m_blocker == nullptr);
break;
default:
ASSERT_NOT_REACHED();
}
m_blocker = &t;
if (!t.should_block()) {
// Don't block if the wake condition is already met
t.not_blocking(false);
m_blocker = nullptr;
m_in_block = false;
return BlockResult::NotBlocked;
}
auto& block_timeout = t.override_timeout(timeout);
if (!block_timeout.is_infinite()) {
m_blocker_timeout = timer = TimerQueue::the().add_timer_without_id(block_timeout.clock_id(), block_timeout.absolute_time(), [&]() {
// Process::kill_all_threads may be called at any time, which will mark all
// threads to die. In that case
m_blocker_timeout = timer = TimerQueue::the().add_timer_without_id(block_timeout.clock_id(), block_timeout.absolute_time(), [this]() {
ASSERT(!g_scheduler_lock.own_lock());
ASSERT(!m_block_lock.own_lock());
// NOTE: this may execute on the same or any other processor!
ScopedSpinLock scheduler_lock(g_scheduler_lock);
ScopedSpinLock lock(m_lock);
if (m_blocker) {
{
ScopedSpinLock block_lock(m_block_lock);
if (!m_blocker)
return;
m_blocker_timeout = nullptr;
if (!is_stopped()) {
}
ScopedSpinLock scheduler_lock(g_scheduler_lock);
ScopedSpinLock block_lock(m_block_lock);
if (!this->is_stopped()) {
// Only unblock if we're not stopped. In either
// case the blocker should be marked as timed out
unblock();
}
}
});
if (!m_blocker_timeout) {
// Timeout is already in the past
t.not_blocking(true);
m_blocker = nullptr;
m_in_block = false;
return BlockResult::InterruptedByTimeout;
}
} else {
@ -765,29 +849,29 @@ public:
set_state(Thread::Blocked);
}
lock.unlock();
block_lock.unlock();
scheduler_lock.unlock();
// Yield to the scheduler, and wait for us to resume unblocked.
yield_without_holding_big_lock();
scheduler_lock.lock();
lock.lock();
bool is_stopped = false;
bool is_stopped = state() == Thread::Stopped;
{
if (t.was_interrupted_by_signal())
if (t.was_interrupted_by_signal()) {
ScopedSpinLock lock(m_lock);
dispatch_one_pending_signal();
}
auto current_state = state();
// We should no longer be blocked once we woke up, but we may be stopped
if (current_state == Stopped)
is_stopped = true;
else
ASSERT(current_state == Thread::Running);
ASSERT(state() == (is_stopped ? Thread::Stopped : Thread::Running));
ScopedSpinLock block_lock2(m_block_lock);
// Remove ourselves...
m_blocker = nullptr;
m_in_block = false;
if (timer && !m_blocker_timeout)
did_timeout = true;
}
@ -796,27 +880,23 @@ public:
// to clean up now while we're still holding m_lock
auto result = t.end_blocking({}, did_timeout); // calls was_unblocked internally
scheduler_lock.unlock();
if (timer && !did_timeout) {
// Cancel the timer while not holding any locks. This allows
// the timer function to complete before we remove it
// (e.g. if it's on another processor)
lock.unlock();
scheduler_lock.unlock();
TimerQueue::the().cancel_timer(timer.release_nonnull());
} else {
scheduler_lock.unlock();
}
if (is_stopped) {
// If we're stopped we need to yield
yield_without_holding_big_lock();
}
} else if (is_stopped) {
// If we're stopped we need to yield
yield_without_holding_big_lock();
}
return result;
}
BlockResult wait_on(WaitQueue& queue, const char* reason, const BlockTimeout& = nullptr, Atomic<bool>* lock = nullptr, RefPtr<Thread> beneficiary = {});
void wake_from_queue();
void unblock_from_blocker(Blocker&);
void unblock(u8 signal = 0);
@ -864,6 +944,7 @@ public:
DispatchSignalResult dispatch_one_pending_signal();
DispatchSignalResult try_dispatch_one_pending_signal(u8 signal);
DispatchSignalResult dispatch_signal(u8 signal);
void check_dispatch_pending_signal();
bool has_unmasked_pending_signals() const { return m_have_any_unmasked_pending_signals.load(AK::memory_order_consume); }
void terminate_due_to_signal(u8 signal);
bool should_ignore_signal(u8 signal) const;
@ -929,15 +1010,14 @@ public:
m_ipv4_socket_write_bytes += bytes;
}
const char* wait_reason() const
{
return m_wait_reason;
}
void set_active(bool active)
{
m_is_active.store(active, AK::memory_order_release);
}
bool is_active() const
{
return m_is_active.load(AK::MemoryOrder::memory_order_acquire);
}
bool is_finalizable() const
{
@ -946,7 +1026,7 @@ public:
// as the thread may not be in Running state but switching out.
// m_is_active is set to false once the context switch is
// complete and the thread is not executing on any processor.
if (m_is_active.load(AK::memory_order_consume))
if (m_is_active.load(AK::memory_order_acquire))
return false;
// We can't finalize until the thread is either detached or
// a join has started. We can't make m_is_joinable atomic
@ -1020,7 +1100,6 @@ public:
private:
IntrusiveListNode m_runnable_list_node;
IntrusiveListNode m_wait_queue_node;
private:
friend struct SchedulerData;
@ -1088,6 +1167,7 @@ private:
void reset_fpu_state();
mutable RecursiveSpinLock m_lock;
mutable RecursiveSpinLock m_block_lock;
NonnullRefPtr<Process> m_process;
ThreadID m_tid { -1 };
TSS32 m_tss;
@ -1106,8 +1186,6 @@ private:
SignalActionData m_signal_action_data[32];
Blocker* m_blocker { nullptr };
RefPtr<Timer> m_blocker_timeout;
const char* m_wait_reason { nullptr };
WaitQueue* m_queue { nullptr };
#ifdef LOCK_DEBUG
struct HoldingLockInfo {
@ -1152,11 +1230,13 @@ private:
bool m_dump_backtrace_on_finalization { false };
bool m_should_die { false };
bool m_initialized { false };
bool m_in_block { false };
Atomic<bool> m_have_any_unmasked_pending_signals { false };
OwnPtr<ThreadTracer> m_tracer;
void yield_without_holding_big_lock();
void yield_while_not_holding_big_lock();
void update_state_for_thread(Thread::State previous_state);
};

View file

@ -81,11 +81,14 @@ Thread::JoinBlocker::JoinBlocker(Thread& joinee, KResult& try_join_result, void*
// We need to hold our lock to avoid a race where try_join succeeds
// but the joinee is joining immediately
ScopedSpinLock lock(m_lock);
try_join_result = joinee.try_join(*this);
m_join_error = try_join_result.is_error();
}
try_join_result = joinee.try_join([&]() {
if (!set_block_condition(joinee.m_join_condition))
m_should_block = false;
});
m_join_error = try_join_result.is_error();
if (m_join_error)
m_should_block = false;
}
}
void Thread::JoinBlocker::not_blocking(bool timeout_in_past)
@ -119,13 +122,36 @@ bool Thread::JoinBlocker::unblock(void* value, bool from_add_blocker)
return true;
}
Thread::QueueBlocker::QueueBlocker(WaitQueue& wait_queue, const char* block_reason)
: m_block_reason(block_reason)
{
if (!set_block_condition(wait_queue, Thread::current()))
m_should_block = false;
}
Thread::QueueBlocker::~QueueBlocker()
{
}
bool Thread::QueueBlocker::unblock()
{
{
ScopedSpinLock lock(m_lock);
if (m_did_unblock)
return false;
m_did_unblock = true;
}
unblock_from_blocker();
return true;
}
Thread::FileDescriptionBlocker::FileDescriptionBlocker(FileDescription& description, BlockFlags flags, BlockFlags& unblocked_flags)
: m_blocked_description(description)
, m_flags(flags)
, m_unblocked_flags(unblocked_flags)
{
m_unblocked_flags = BlockFlags::None;
if (!set_block_condition(description.block_condition()))
m_should_block = false;
}
@ -281,19 +307,13 @@ Thread::SelectBlocker::SelectBlocker(FDVector& fds)
continue;
if (!fd_entry.description->block_condition().add_blocker(*this, &fd_entry))
m_should_block = false;
m_registered_count++;
}
}
Thread::SelectBlocker::~SelectBlocker()
{
if (m_registered_count > 0) {
for (auto& fd_entry : m_fds) {
for (auto& fd_entry : m_fds)
fd_entry.description->block_condition().remove_blocker(*this, &fd_entry);
if (--m_registered_count == 0)
break;
}
}
}
void Thread::SelectBlocker::not_blocking(bool timeout_in_past)

View file

@ -381,9 +381,7 @@ PageFaultResponse Region::handle_zero_fault(size_t page_index_in_region)
ASSERT_INTERRUPTS_DISABLED();
ASSERT(vmobject().is_anonymous());
sti();
LOCKER(vmobject().m_paging_lock);
cli();
auto& page_slot = physical_page_slot(page_index_in_region);

View file

@ -31,126 +31,88 @@
namespace Kernel {
WaitQueue::WaitQueue()
bool WaitQueue::should_add_blocker(Thread::Blocker& b, void* data)
{
}
WaitQueue::~WaitQueue()
{
}
bool WaitQueue::enqueue(Thread& thread)
{
ScopedSpinLock queue_lock(m_lock);
ASSERT(data != nullptr); // Thread that is requesting to be blocked
ASSERT(m_lock.is_locked());
ASSERT(b.blocker_type() == Thread::Blocker::Type::Queue);
if (m_wake_requested) {
// wake_* was called when no threads were in the queue
// we shouldn't wait at all
m_wake_requested = false;
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": enqueue: wake_all pending";
dbg() << "WaitQueue @ " << this << ": do not block thread " << *static_cast<Thread*>(data) << ", wake was pending";
#endif
return false;
}
m_threads.append(thread);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue @ " << this << ": should block thread " << *static_cast<Thread*>(data);
#endif
return true;
}
bool WaitQueue::dequeue(Thread& thread)
void WaitQueue::wake_one()
{
ScopedSpinLock queue_lock(m_lock);
if (m_threads.contains(thread)) {
m_threads.remove(thread);
ScopedSpinLock lock(m_lock);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue @ " << this << ": wake_one";
#endif
bool did_unblock_one = do_unblock_some([&](Thread::Blocker& b, void* data, bool& stop_iterating) {
ASSERT(data);
ASSERT(b.blocker_type() == Thread::Blocker::Type::Queue);
auto& blocker = static_cast<Thread::QueueBlocker&>(b);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue @ " << this << ": wake_one unblocking " << *static_cast<Thread*>(data);
#endif
if (blocker.unblock()) {
stop_iterating = true;
return true;
}
return false;
}
void WaitQueue::wake_one(Atomic<bool>* lock)
{
ScopedSpinLock queue_lock(m_lock);
if (lock)
*lock = false;
if (m_threads.is_empty()) {
// Save the fact that a wake was requested
m_wake_requested = true;
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_one: nobody to wake, mark as pending";
#endif
return;
}
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_one:";
#endif
auto* thread = m_threads.take_first();
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_one: wake thread " << *thread;
#endif
thread->wake_from_queue();
m_wake_requested = false;
Scheduler::yield();
});
m_wake_requested = !did_unblock_one;
}
void WaitQueue::wake_n(u32 wake_count)
{
ScopedSpinLock queue_lock(m_lock);
if (m_threads.is_empty()) {
// Save the fact that a wake was requested
m_wake_requested = true;
if (wake_count == 0)
return; // should we assert instaed?
ScopedSpinLock lock(m_lock);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_n: nobody to wake, mark as pending";
dbg() << "WaitQueue @ " << this << ": wake_n(" << wake_count << ")";
#endif
return;
bool did_unblock_some = do_unblock_some([&](Thread::Blocker& b, void* data, bool& stop_iterating) {
ASSERT(data);
ASSERT(b.blocker_type() == Thread::Blocker::Type::Queue);
auto& blocker = static_cast<Thread::QueueBlocker&>(b);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue @ " << this << ": wake_n unblocking " << *static_cast<Thread*>(data);
#endif
ASSERT(wake_count > 0);
if (blocker.unblock()) {
if (--wake_count == 0)
stop_iterating = true;
return true;
}
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_n: " << wake_count;
#endif
for (u32 i = 0; i < wake_count; ++i) {
Thread* thread = m_threads.take_first();
if (!thread)
break;
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_n: wake thread " << *thread;
#endif
thread->wake_from_queue();
}
m_wake_requested = false;
Scheduler::yield();
return false;
});
m_wake_requested = !did_unblock_some;
}
void WaitQueue::wake_all()
{
ScopedSpinLock queue_lock(m_lock);
if (m_threads.is_empty()) {
// Save the fact that a wake was requested
m_wake_requested = true;
ScopedSpinLock lock(m_lock);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_all: nobody to wake, mark as pending";
dbg() << "WaitQueue @ " << this << ": wake_all";
#endif
return;
}
bool did_unblock_any = do_unblock_all([&](Thread::Blocker& b, void* data) {
ASSERT(data);
ASSERT(b.blocker_type() == Thread::Blocker::Type::Queue);
auto& blocker = static_cast<Thread::QueueBlocker&>(b);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_all: ";
dbg() << "WaitQueue @ " << this << ": wake_all unblocking " << *static_cast<Thread*>(data);
#endif
while (!m_threads.is_empty()) {
Thread* thread = m_threads.take_first();
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": wake_all: wake thread " << *thread;
#endif
thread->wake_from_queue();
}
m_wake_requested = false;
Scheduler::yield();
}
void WaitQueue::clear()
{
ScopedSpinLock queue_lock(m_lock);
#ifdef WAITQUEUE_DEBUG
dbg() << "WaitQueue " << VirtualAddress(this) << ": clear";
#endif
m_threads.clear();
m_wake_requested = false;
return blocker.unblock();
});
m_wake_requested = !did_unblock_any;
}
}

View file

@ -32,23 +32,22 @@
namespace Kernel {
class WaitQueue {
class WaitQueue : public Thread::BlockCondition {
public:
WaitQueue();
~WaitQueue();
SpinLock<u32>& get_lock() { return m_lock; }
bool enqueue(Thread&);
bool dequeue(Thread&);
void wake_one(Atomic<bool>* lock = nullptr);
void wake_one();
void wake_n(u32 wake_count);
void wake_all();
void clear();
template<class... Args>
Thread::BlockResult wait_on(const Thread::BlockTimeout& timeout, Args&&... args)
{
return Thread::current()->block<Thread::QueueBlocker>(timeout, *this, forward<Args>(args)...);
}
protected:
virtual bool should_add_blocker(Thread::Blocker& b, void* data) override;
private:
typedef IntrusiveList<Thread, &Thread::m_wait_queue_node> ThreadList;
ThreadList m_threads;
SpinLock<u32> m_lock;
bool m_wake_requested { false };
};