diff --git a/Kernel/Net/IPv4Socket.cpp b/Kernel/Net/IPv4Socket.cpp index 39a85db4cc..76eb82328f 100644 --- a/Kernel/Net/IPv4Socket.cpp +++ b/Kernel/Net/IPv4Socket.cpp @@ -212,7 +212,7 @@ ssize_t IPv4Socket::recvfrom(FileDescription& description, void* buffer, size_t } load_receive_deadline(); - current->block(Thread::BlockedReceive, description); + current->block(*new Thread::ThreadBlockerReceive(description)); LOCKER(lock()); if (!m_can_read) { diff --git a/Kernel/Net/TCPSocket.cpp b/Kernel/Net/TCPSocket.cpp index ad2f688495..4f6772491d 100644 --- a/Kernel/Net/TCPSocket.cpp +++ b/Kernel/Net/TCPSocket.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include Lockable>& TCPSocket::sockets_by_port() @@ -161,7 +162,7 @@ KResult TCPSocket::protocol_connect(FileDescription& description, ShouldBlock sh m_state = State::Connecting; if (should_block == ShouldBlock::Yes) { - current->block(Thread::BlockedConnect, description); + current->block(*new Thread::ThreadBlockerConnect(description)); ASSERT(is_connected()); return KSuccess; } diff --git a/Kernel/Process.cpp b/Kernel/Process.cpp index f89b5bc870..adc13eb16a 100644 --- a/Kernel/Process.cpp +++ b/Kernel/Process.cpp @@ -900,7 +900,7 @@ ssize_t Process::do_write(FileDescription& description, const u8* data, int data #ifdef IO_DEBUG dbgprintf("block write on %d\n", fd); #endif - current->block(Thread::State::BlockedWrite, description); + current->block(*new Thread::ThreadBlockerWrite(description)); } ssize_t rc = description.write(data + nwritten, data_size - nwritten); #ifdef IO_DEBUG @@ -962,7 +962,7 @@ ssize_t Process::sys$read(int fd, u8* buffer, ssize_t size) return -EBADF; if (description->is_blocking()) { if (!description->can_read()) { - current->block(Thread::State::BlockedRead, *description); + current->block(*new Thread::ThreadBlockerRead(*description)); if (current->m_was_interrupted_while_blocked) return -EINTR; } @@ -2122,7 +2122,7 @@ int Process::sys$accept(int accepting_socket_fd, sockaddr* address, socklen_t* a auto& socket = *accepting_socket_description->socket(); if (!socket.can_accept()) { if (accepting_socket_description->is_blocking()) { - current->block(Thread::State::BlockedAccept, *accepting_socket_description); + current->block(*new Thread::ThreadBlockerAccept(*accepting_socket_description)); if (current->m_was_interrupted_while_blocked) return -EINTR; } else { diff --git a/Kernel/Scheduler.cpp b/Kernel/Scheduler.cpp index 08aba31024..89dfcd22ba 100644 --- a/Kernel/Scheduler.cpp +++ b/Kernel/Scheduler.cpp @@ -51,6 +51,88 @@ void Scheduler::beep() s_beep_timeout = g_uptime + 100; } +Thread::ThreadBlockerFileDescription::ThreadBlockerFileDescription(const RefPtr& description) + : m_blocked_description(description) +{} + +RefPtr Thread::ThreadBlockerFileDescription::blocked_description() const +{ + return m_blocked_description; +} + +Thread::ThreadBlockerAccept::ThreadBlockerAccept(const RefPtr& description) + : ThreadBlockerFileDescription(description) +{ +} + +bool Thread::ThreadBlockerAccept::should_unblock(time_t, long) +{ + auto& description = *blocked_description(); + auto& socket = *description.socket(); + + return socket.can_accept(); +} + +Thread::ThreadBlockerReceive::ThreadBlockerReceive(const RefPtr& description) + : ThreadBlockerFileDescription(description) +{ +} + +bool Thread::ThreadBlockerReceive::should_unblock(time_t now_sec, long now_usec) +{ + auto& description = *blocked_description(); + auto& socket = *description.socket(); + // FIXME: Block until the amount of data wanted is available. + bool timed_out = now_sec > socket.receive_deadline().tv_sec || (now_sec == socket.receive_deadline().tv_sec && now_usec >= socket.receive_deadline().tv_usec); + if (timed_out || description.can_read()) + return true; + return false; +} + +Thread::ThreadBlockerConnect::ThreadBlockerConnect(const RefPtr& description) + : ThreadBlockerFileDescription(description) +{ +} + +bool Thread::ThreadBlockerConnect::should_unblock(time_t, long) +{ + auto& description = *blocked_description(); + auto& socket = *description.socket(); + return socket.is_connected(); +} + +Thread::ThreadBlockerWrite::ThreadBlockerWrite(const RefPtr& description) + : ThreadBlockerFileDescription(description) +{ +} + +bool Thread::ThreadBlockerWrite::should_unblock(time_t, long) +{ + return blocked_description()->can_write(); +} + +Thread::ThreadBlockerRead::ThreadBlockerRead(const RefPtr& description) + : ThreadBlockerFileDescription(description) +{ +} + +bool Thread::ThreadBlockerRead::should_unblock(time_t, long) +{ + // FIXME: Block until the amount of data wanted is available. + return blocked_description()->can_read(); +} + +Thread::ThreadBlockerCondition::ThreadBlockerCondition(Function &condition) + : m_block_until_condition(move(condition)) +{ + ASSERT(m_block_until_condition); +} + +bool Thread::ThreadBlockerCondition::should_unblock(time_t, long) +{ + return m_block_until_condition(); +} + // Called by the scheduler on threads that are blocked for some reason. // Make a decision as to whether to unblock them or not. void Thread::consider_unblock(time_t now_sec, long now_usec) @@ -93,41 +175,6 @@ void Thread::consider_unblock(time_t now_sec, long now_usec) return IterationDecision::Break; }); return; - case Thread::BlockedRead: - ASSERT(m_blocked_description); - // FIXME: Block until the amount of data wanted is available. - if (m_blocked_description->can_read()) - unblock(); - return; - case Thread::BlockedWrite: - ASSERT(m_blocked_description != -1); - if (m_blocked_description->can_write()) - unblock(); - return; - case Thread::BlockedConnect: { - auto& description = *m_blocked_description; - auto& socket = *description.socket(); - if (socket.is_connected()) - unblock(); - return; - } - case Thread::BlockedReceive: { - auto& description = *m_blocked_description; - auto& socket = *description.socket(); - // FIXME: Block until the amount of data wanted is available. - bool timed_out = now_sec > socket.receive_deadline().tv_sec || (now_sec == socket.receive_deadline().tv_sec && now_usec >= socket.receive_deadline().tv_usec); - if (timed_out || description.can_read()) - unblock(); - return; - } - case Thread::BlockedAccept: { - auto& description = *m_blocked_description; - auto& socket = *description.socket(); - - if (socket.can_accept()) - unblock(); - return; - } case Thread::BlockedSelect: if (m_select_has_timeout) { if (now_sec > m_select_timeout.tv_sec || (now_sec == m_select_timeout.tv_sec && now_usec >= m_select_timeout.tv_usec)) { @@ -149,9 +196,10 @@ void Thread::consider_unblock(time_t now_sec, long now_usec) } return; case Thread::BlockedCondition: - if (m_block_until_condition()) { - m_block_until_condition = nullptr; + ASSERT(m_blocker); + if (m_blocker->should_unblock(now_sec, now_usec)) { unblock(); + m_blocker = nullptr; } return; case Thread::Skip1SchedulerPass: diff --git a/Kernel/Thread.cpp b/Kernel/Thread.cpp index 1f1ef5fa41..c8e793e722 100644 --- a/Kernel/Thread.cpp +++ b/Kernel/Thread.cpp @@ -99,7 +99,7 @@ Thread::~Thread() void Thread::unblock() { - m_blocked_description = nullptr; + m_blocker = nullptr; if (current == this) { set_state(Thread::Running); return; @@ -110,7 +110,7 @@ void Thread::unblock() void Thread::block_until(Function&& condition) { - m_block_until_condition = move(condition); + m_blocker = make(condition); block(Thread::BlockedCondition); Scheduler::yield(); } @@ -129,10 +129,10 @@ void Thread::block(Thread::State new_state) process().big_lock().lock(); } -void Thread::block(Thread::State new_state, FileDescription& description) +void Thread::block(ThreadBlocker& blocker) { - m_blocked_description = &description; - block(new_state); + m_blocker = &blocker; + block(Thread::BlockedCondition); } void Thread::sleep(u32 ticks) @@ -165,22 +165,12 @@ const char* to_string(Thread::State state) return "Sleep"; case Thread::BlockedWait: return "Wait"; - case Thread::BlockedRead: - return "Read"; - case Thread::BlockedWrite: - return "Write"; case Thread::BlockedSignal: return "Signal"; case Thread::BlockedSelect: return "Select"; case Thread::BlockedLurking: return "Lurking"; - case Thread::BlockedConnect: - return "Connect"; - case Thread::BlockedReceive: - return "Receive"; - case Thread::BlockedAccept: - return "Accepting"; case Thread::BlockedCondition: return "Condition"; case Thread::__Begin_Blocked_States__: @@ -197,7 +187,7 @@ void Thread::finalize() dbgprintf("Finalizing Thread %u in %s(%u)\n", tid(), m_process.name().characters(), pid()); set_state(Thread::State::Dead); - m_blocked_description = nullptr; + m_blocker = nullptr; if (this == &m_process.main_thread()) m_process.finalize(); @@ -558,7 +548,7 @@ KResult Thread::wait_for_connect(FileDescription& description) auto& socket = *description.socket(); if (socket.is_connected()) return KSuccess; - block(Thread::State::BlockedConnect, description); + block(*new Thread::ThreadBlockerConnect(description)); Scheduler::yield(); if (!socket.is_connected()) return KResult(-ECONNREFUSED); diff --git a/Kernel/Thread.h b/Kernel/Thread.h index b1da922852..576d06d03e 100644 --- a/Kernel/Thread.h +++ b/Kernel/Thread.h @@ -67,17 +67,66 @@ public: BlockedLurking, BlockedSleep, BlockedWait, - BlockedRead, - BlockedWrite, BlockedSignal, BlockedSelect, - BlockedConnect, - BlockedReceive, - BlockedAccept, BlockedCondition, __End_Blocked_States__ }; + class ThreadBlocker { + public: + virtual ~ThreadBlocker() {} + virtual bool should_unblock(time_t now_s, long us) = 0; + }; + + class ThreadBlockerFileDescription : public ThreadBlocker { + public: + ThreadBlockerFileDescription(const RefPtr& description); + RefPtr blocked_description() const; + + private: + RefPtr m_blocked_description; + }; + + class ThreadBlockerAccept : public ThreadBlockerFileDescription { + public: + ThreadBlockerAccept(const RefPtr& description); + virtual bool should_unblock(time_t, long) override; + }; + + class ThreadBlockerReceive : public ThreadBlockerFileDescription { + public: + ThreadBlockerReceive(const RefPtr& description); + virtual bool should_unblock(time_t, long) override; + }; + + class ThreadBlockerConnect : public ThreadBlockerFileDescription { + public: + ThreadBlockerConnect(const RefPtr& description); + virtual bool should_unblock(time_t, long) override; + }; + + class ThreadBlockerWrite : public ThreadBlockerFileDescription { + public: + ThreadBlockerWrite(const RefPtr& description); + virtual bool should_unblock(time_t, long) override; + }; + + class ThreadBlockerRead : public ThreadBlockerFileDescription { + public: + ThreadBlockerRead(const RefPtr& description); + virtual bool should_unblock(time_t, long) override; + }; + + class ThreadBlockerCondition : public ThreadBlocker { + public: + ThreadBlockerCondition(Function &condition); + virtual bool should_unblock(time_t, long) override; + + private: + Function m_block_until_condition; + }; + void did_schedule() { ++m_times_scheduled; } u32 times_scheduled() const { return m_times_scheduled; } @@ -99,7 +148,7 @@ public: void sleep(u32 ticks); void block(Thread::State); - void block(Thread::State, FileDescription&); + void block(ThreadBlocker& blocker); void unblock(); void set_wakeup_time(u64 t) { m_wakeup_time = t; } @@ -186,11 +235,10 @@ private: RefPtr m_kernel_stack_for_signal_handler_region; pid_t m_waitee_pid { -1 }; int m_wait_options { 0 }; - RefPtr m_blocked_description; timeval m_select_timeout; SignalActionData m_signal_action_data[32]; Region* m_signal_stack_user_region { nullptr }; - Function m_block_until_condition; + OwnPtr m_blocker; Vector m_select_read_fds; Vector m_select_write_fds; Vector m_select_exceptional_fds;