1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-07-27 07:57:46 +00:00

Kernel: Move block condition evaluation out of the Scheduler

This makes the Scheduler a lot leaner by not having to evaluate
block conditions every time it is invoked. Instead evaluate them as
the states change, and unblock threads at that point.

This also implements some more waitid/waitpid/wait features and
behavior. For example, WUNTRACED and WNOWAIT are now supported. And
wait will now not return EINTR when SIGCHLD is delivered at the
same time.
This commit is contained in:
Tom 2020-11-29 16:05:27 -07:00 committed by Andreas Kling
parent 6a620562cc
commit 046d6855f5
53 changed files with 2027 additions and 930 deletions

View file

@ -95,6 +95,11 @@ FIFO::FIFO(uid_t uid)
LOCKER(all_fifos().lock());
all_fifos().resource().set(this);
m_fifo_id = ++s_next_fifo_id;
// Use the same block condition for read and write
m_buffer.set_unblock_callback([this]() {
evaluate_block_conditions();
});
}
FIFO::~FIFO()
@ -116,6 +121,8 @@ void FIFO::attach(Direction direction)
klog() << "open writer (" << m_writers << ")";
#endif
}
evaluate_block_conditions();
}
void FIFO::detach(Direction direction)
@ -133,6 +140,8 @@ void FIFO::detach(Direction direction)
ASSERT(m_writers);
--m_writers;
}
evaluate_block_conditions();
}
bool FIFO::can_read(const FileDescription&, size_t) const

View file

@ -31,6 +31,7 @@
namespace Kernel {
File::File()
: m_block_condition(*this)
{
}

View file

@ -39,6 +39,36 @@
namespace Kernel {
class File;
class FileBlockCondition : public Thread::BlockCondition {
public:
FileBlockCondition(File& file)
: m_file(file)
{
}
virtual bool should_add_blocker(Thread::Blocker& b, void* data) override
{
ASSERT(b.blocker_type() == Thread::Blocker::Type::File);
auto& blocker = static_cast<Thread::FileBlocker&>(b);
return !blocker.unblock(true, data);
}
void unblock()
{
ScopedSpinLock lock(m_lock);
do_unblock([&](auto& b, void* data) {
ASSERT(b.blocker_type() == Thread::Blocker::Type::File);
auto& blocker = static_cast<Thread::FileBlocker&>(b);
return blocker.unblock(false, data);
});
}
private:
File& m_file;
};
// File is the base class for anything that can be referenced by a FileDescription.
//
// The most important functions in File are:
@ -103,8 +133,27 @@ public:
virtual bool is_character_device() const { return false; }
virtual bool is_socket() const { return false; }
virtual FileBlockCondition& block_condition() { return m_block_condition; }
protected:
File();
void evaluate_block_conditions()
{
if (Processor::current().in_irq()) {
// If called from an IRQ handler we need to delay evaluation
// and unblocking of waiting threads
Processor::deferred_call_queue([this]() {
ASSERT(!Processor::current().in_irq());
evaluate_block_conditions();
});
} else {
block_condition().unblock();
}
}
private:
FileBlockCondition m_block_condition;
};
}

View file

@ -75,6 +75,26 @@ FileDescription::~FileDescription()
m_inode = nullptr;
}
Thread::FileBlocker::BlockFlags FileDescription::should_unblock(Thread::FileBlocker::BlockFlags block_flags) const
{
u32 unblock_flags = (u32)Thread::FileBlocker::BlockFlags::None;
if (((u32)block_flags & (u32)Thread::FileBlocker::BlockFlags::Read) && can_read())
unblock_flags |= (u32)Thread::FileBlocker::BlockFlags::Read;
if (((u32)block_flags & (u32)Thread::FileBlocker::BlockFlags::Write) && can_write())
unblock_flags |= (u32)Thread::FileBlocker::BlockFlags::Write;
// TODO: Implement Thread::FileBlocker::BlockFlags::Exception
if ((u32)block_flags & (u32)Thread::FileBlocker::BlockFlags::SocketFlags) {
auto* sock = socket();
ASSERT(sock);
if (((u32)block_flags & (u32)Thread::FileBlocker::BlockFlags::Accept) && sock->can_accept())
unblock_flags |= (u32)Thread::FileBlocker::BlockFlags::Accept;
if (((u32)block_flags & (u32)Thread::FileBlocker::BlockFlags::Connect) && sock->setup_state() == Socket::SetupState::Completed)
unblock_flags |= (u32)Thread::FileBlocker::BlockFlags::Connect;
}
return (Thread::FileBlocker::BlockFlags)unblock_flags;
}
KResult FileDescription::stat(::stat& buffer)
{
LOCKER(m_lock);
@ -113,6 +133,7 @@ off_t FileDescription::seek(off_t offset, int whence)
// FIXME: Return -EINVAL if attempting to seek past the end of a seekable device.
m_current_offset = new_offset;
evaluate_block_conditions();
return m_current_offset;
}
@ -124,8 +145,11 @@ KResultOr<size_t> FileDescription::read(UserOrKernelBuffer& buffer, size_t count
if (new_offset.has_overflow())
return -EOVERFLOW;
auto nread_or_error = m_file->read(*this, offset(), buffer, count);
if (!nread_or_error.is_error() && m_file->is_seekable())
m_current_offset += nread_or_error.value();
if (!nread_or_error.is_error()) {
if (m_file->is_seekable())
m_current_offset += nread_or_error.value();
evaluate_block_conditions();
}
return nread_or_error;
}
@ -137,8 +161,11 @@ KResultOr<size_t> FileDescription::write(const UserOrKernelBuffer& data, size_t
if (new_offset.has_overflow())
return -EOVERFLOW;
auto nwritten_or_error = m_file->write(*this, offset(), data, size);
if (!nwritten_or_error.is_error() && m_file->is_seekable())
m_current_offset += nwritten_or_error.value();
if (!nwritten_or_error.is_error()) {
if (m_file->is_seekable())
m_current_offset += nwritten_or_error.value();
evaluate_block_conditions();
}
return nwritten_or_error;
}
@ -340,4 +367,9 @@ KResult FileDescription::chown(uid_t uid, gid_t gid)
return m_file->chown(*this, uid, gid);
}
FileBlockCondition& FileDescription::block_condition()
{
return m_file->block_condition();
}
}

View file

@ -45,6 +45,8 @@ public:
static NonnullRefPtr<FileDescription> create(File&);
~FileDescription();
Thread::FileBlocker::BlockFlags should_unblock(Thread::FileBlocker::BlockFlags) const;
bool is_readable() const { return m_readable; }
bool is_writable() const { return m_writable; }
@ -130,11 +132,18 @@ public:
KResult chown(uid_t, gid_t);
FileBlockCondition& block_condition();
private:
friend class VFS;
explicit FileDescription(File&);
FileDescription(FIFO&, FIFO::Direction);
void evaluate_block_conditions()
{
block_condition().unblock();
}
RefPtr<Custody> m_custody;
RefPtr<Inode> m_inode;
NonnullRefPtr<File> m_file;

View file

@ -47,8 +47,10 @@ InodeFile::~InodeFile()
KResultOr<size_t> InodeFile::read(FileDescription& description, size_t offset, UserOrKernelBuffer& buffer, size_t count)
{
ssize_t nread = m_inode->read_bytes(offset, count, buffer, &description);
if (nread > 0)
if (nread > 0) {
Thread::current()->did_file_read(nread);
evaluate_block_conditions();
}
if (nread < 0)
return KResult(nread);
return nread;
@ -60,6 +62,7 @@ KResultOr<size_t> InodeFile::write(FileDescription& description, size_t offset,
if (nwritten > 0) {
m_inode->set_mtime(kgettimeofday().tv_sec);
Thread::current()->did_file_write(nwritten);
evaluate_block_conditions();
}
if (nwritten < 0)
return KResult(nwritten);

View file

@ -78,6 +78,7 @@ KResultOr<size_t> InodeWatcher::read(FileDescription&, size_t, UserOrKernelBuffe
});
if (nwritten < 0)
return KResult(nwritten);
evaluate_block_conditions();
return bytes_to_write;
}
@ -97,18 +98,21 @@ void InodeWatcher::notify_inode_event(Badge<Inode>, Event::Type event_type)
{
LOCKER(m_lock);
m_queue.enqueue({ event_type });
evaluate_block_conditions();
}
void InodeWatcher::notify_child_added(Badge<Inode>, const InodeIdentifier& child_id)
{
LOCKER(m_lock);
m_queue.enqueue({ Event::Type::ChildAdded, child_id.index() });
evaluate_block_conditions();
}
void InodeWatcher::notify_child_removed(Badge<Inode>, const InodeIdentifier& child_id)
{
LOCKER(m_lock);
m_queue.enqueue({ Event::Type::ChildRemoved, child_id.index() });
evaluate_block_conditions();
}
}

View file

@ -36,6 +36,7 @@ NonnullRefPtr<Plan9FS> Plan9FS::create(FileDescription& file_description)
Plan9FS::Plan9FS(FileDescription& file_description)
: FileBackedFS(file_description)
, m_completion_blocker(*this)
{
}
@ -216,6 +217,8 @@ private:
bool Plan9FS::initialize()
{
ensure_thread();
Message version_message { *this, Message::Type::Tversion };
version_message << (u32)m_max_message_size << "9P2000.L";
@ -412,7 +415,97 @@ const KBuffer& Plan9FS::Message::build()
return m_built.buffer;
}
KResult Plan9FS::post_message(Message& message)
Plan9FS::ReceiveCompletion::ReceiveCompletion(u16 tag)
: tag(tag)
{
}
Plan9FS::ReceiveCompletion::~ReceiveCompletion()
{
}
bool Plan9FS::Blocker::unblock(u16 tag)
{
{
ScopedSpinLock lock(m_lock);
if (m_did_unblock)
return false;
m_did_unblock = true;
if (m_completion->tag != tag)
return false;
if (!m_completion->result.is_error())
m_message = move(*m_completion->message);
}
return unblock();
}
void Plan9FS::Blocker::not_blocking(bool)
{
{
ScopedSpinLock lock(m_lock);
if (m_did_unblock)
return;
}
m_fs.m_completion_blocker.try_unblock(*this);
}
bool Plan9FS::Blocker::is_completed() const
{
ScopedSpinLock lock(m_completion->lock);
return m_completion->completed;
}
bool Plan9FS::Plan9FSBlockCondition::should_add_blocker(Thread::Blocker& b, void*)
{
// NOTE: m_lock is held already!
auto& blocker = static_cast<Blocker&>(b);
return !blocker.is_completed();
}
void Plan9FS::Plan9FSBlockCondition::unblock_completed(u16 tag)
{
unblock([&](Thread::Blocker& b, void*) {
ASSERT(b.blocker_type() == Thread::Blocker::Type::Plan9FS);
auto& blocker = static_cast<Blocker&>(b);
return blocker.unblock(tag);
});
}
void Plan9FS::Plan9FSBlockCondition::unblock_all()
{
BlockCondition::unblock_all([&](Thread::Blocker& b, void*) {
ASSERT(b.blocker_type() == Thread::Blocker::Type::Plan9FS);
auto& blocker = static_cast<Blocker&>(b);
return blocker.unblock();
});
}
void Plan9FS::Plan9FSBlockCondition::try_unblock(Plan9FS::Blocker& blocker)
{
if (m_fs.is_complete(*blocker.completion())) {
ScopedSpinLock lock(m_lock);
blocker.unblock(blocker.completion()->tag);
}
}
bool Plan9FS::is_complete(const ReceiveCompletion& completion)
{
LOCKER(m_lock);
if (m_completions.contains(completion.tag)) {
// If it's still in the map then it can't be complete
ASSERT(!completion.completed);
return false;
}
// if it's not in the map anymore, it must be complete. But we MUST
// hold m_lock to be able to check completion.completed!
ASSERT(completion.completed);
return true;
}
KResult Plan9FS::post_message(Message& message, RefPtr<ReceiveCompletion> completion)
{
auto& buffer = message.build();
const u8* data = buffer.data();
@ -421,9 +514,21 @@ KResult Plan9FS::post_message(Message& message)
LOCKER(m_send_lock);
if (completion) {
// Save the completion record *before* we send the message. This
// ensures that it exists when the thread reads the response
LOCKER(m_lock);
auto tag = completion->tag;
m_completions.set(tag, completion.release_nonnull());
// TODO: What if there is a collision? Do we need to wait until
// the existing record with the tag completes before queueing
// this one?
}
while (size > 0) {
if (!description.can_write()) {
if (Thread::current()->block<Thread::WriteBlocker>(nullptr, description).was_interrupted())
auto unblock_flags = Thread::FileBlocker::BlockFlags::None;
if (Thread::current()->block<Thread::WriteBlocker>(nullptr, description, unblock_flags).was_interrupted())
return KResult(-EINTR);
}
auto data_buffer = UserOrKernelBuffer::for_kernel_buffer(const_cast<u8*>(data));
@ -443,7 +548,8 @@ KResult Plan9FS::do_read(u8* data, size_t size)
auto& description = file_description();
while (size > 0) {
if (!description.can_read()) {
if (Thread::current()->block<Thread::ReadBlocker>(nullptr, description).was_interrupted())
auto unblock_flags = Thread::FileBlocker::BlockFlags::None;
if (Thread::current()->block<Thread::ReadBlocker>(nullptr, description, unblock_flags).was_interrupted())
return KResult(-EINTR);
}
auto data_buffer = UserOrKernelBuffer::for_kernel_buffer(data);
@ -461,9 +567,6 @@ KResult Plan9FS::do_read(u8* data, size_t size)
KResult Plan9FS::read_and_dispatch_one_message()
{
ASSERT(m_someone_is_reading);
// That someone is us.
struct [[gnu::packed]] Header
{
u32 size;
@ -485,112 +588,42 @@ KResult Plan9FS::read_and_dispatch_one_message()
LOCKER(m_lock);
auto optional_completion = m_completions.get(header.tag);
if (!optional_completion.has_value()) {
if (m_tags_to_ignore.contains(header.tag)) {
m_tags_to_ignore.remove(header.tag);
} else {
dbg() << "Received a 9p message of type " << header.type << " with an unexpected tag " << header.tag << ", dropping";
}
return KSuccess;
if (optional_completion.has_value()) {
auto completion = optional_completion.value();
ScopedSpinLock lock(completion->lock);
completion->result = KSuccess;
completion->message = new Message { move(buffer) };
completion->completed = true;
m_completions.remove(header.tag);
m_completion_blocker.unblock_completed(header.tag);
} else {
dbg() << "Received a 9p message of type " << header.type << " with an unexpected tag " << header.tag << ", dropping";
}
ReceiveCompletion& completion = *optional_completion.value();
completion.result = KSuccess;
completion.message = Message { move(buffer) };
completion.completed = true;
m_completions.remove(header.tag);
return KSuccess;
}
bool Plan9FS::Blocker::should_unblock(Thread&)
{
if (m_completion.completed)
return true;
bool someone_else_is_reading = m_completion.fs.m_someone_is_reading.exchange(true);
if (!someone_else_is_reading) {
// We're gonna start reading ourselves; unblock.
return true;
}
return false;
}
KResult Plan9FS::wait_for_specific_message(u16 tag, Message& out_message)
{
KResult result = KSuccess;
ReceiveCompletion completion { *this, out_message, result, false };
{
LOCKER(m_lock);
m_completions.set(tag, &completion);
}
// Block until either:
// * Someone else reads the message we're waiting for, and hands it to us;
// * Or we become the one to read and dispatch messages.
if (Thread::current()->block<Plan9FS::Blocker>(nullptr, completion).was_interrupted()) {
LOCKER(m_lock);
m_completions.remove(tag);
return KResult(-EINTR);
}
// See for which reason we woke up.
if (completion.completed) {
// Somebody else completed it for us; nothing further to do.
return result;
}
while (!completion.completed && result.is_success()) {
result = read_and_dispatch_one_message();
}
if (result.is_error()) {
// If we fail to read, wake up everyone with an error.
LOCKER(m_lock);
for (auto& it : m_completions) {
it.value->result = result;
it.value->completed = true;
}
m_completions.clear();
}
// Wake up someone else, if anyone is interested...
m_someone_is_reading = false;
// ...and return.
return result;
}
KResult Plan9FS::post_message_and_explicitly_ignore_reply(Message& message)
{
auto tag = message.tag();
{
LOCKER(m_lock);
m_tags_to_ignore.set(tag);
}
auto result = post_message(message);
if (result.is_error()) {
LOCKER(m_lock);
m_tags_to_ignore.remove(tag);
}
return result;
return post_message(message, {});
}
KResult Plan9FS::post_message_and_wait_for_a_reply(Message& message, bool auto_convert_error_reply_to_error)
KResult Plan9FS::post_message_and_wait_for_a_reply(Message& message)
{
auto request_type = message.type();
auto tag = message.tag();
auto result = post_message(message);
if (result.is_error())
return result;
result = wait_for_specific_message(tag, message);
auto completion = adopt(*new ReceiveCompletion(tag));
auto result = post_message(message, completion);
if (result.is_error())
return result;
if (Thread::current()->block<Plan9FS::Blocker>(nullptr, *this, message, completion).was_interrupted())
return KResult(-EINTR);
if (!auto_convert_error_reply_to_error)
return KSuccess;
if (completion->result.is_error()) {
dbg() << "Plan9FS: Message was aborted with error " << completion->result;
return KResult(-EIO);
}
auto reply_type = message.type();
@ -624,6 +657,39 @@ ssize_t Plan9FS::adjust_buffer_size(ssize_t size) const
return min(size, max_size);
}
void Plan9FS::thread_main()
{
dbg() << "Plan9FS: Thread running";
do {
auto result = read_and_dispatch_one_message();
if (result.is_error()) {
// If we fail to read, wake up everyone with an error.
LOCKER(m_lock);
for (auto& it : m_completions) {
it.value->result = result;
it.value->completed = true;
}
m_completions.clear();
m_completion_blocker.unblock_all();
dbg() << "Plan9FS: Thread terminating, error reading";
return;
}
} while (!m_thread_shutdown.load(AK::MemoryOrder::memory_order_relaxed));
dbg() << "Plan9FS: Thread terminating";
}
void Plan9FS::ensure_thread()
{
ScopedSpinLock lock(m_thread_lock);
if (!m_thread_running.exchange(true, AK::MemoryOrder::memory_order_acq_rel)) {
Process::create_kernel_process(m_thread, "Plan9FS", [&]() {
thread_main();
m_thread_running.store(false, AK::MemoryOrder::memory_order_release);
});
}
}
Plan9FSInode::Plan9FSInode(Plan9FS& fs, u32 fid)
: Inode(fs, fid)
{

View file

@ -68,38 +68,86 @@ public:
private:
Plan9FS(FileDescription&);
struct ReceiveCompletion {
Plan9FS& fs;
Message& message;
KResult& result;
Atomic<bool> completed;
class Blocker;
class Plan9FSBlockCondition : public Thread::BlockCondition {
public:
Plan9FSBlockCondition(Plan9FS& fs)
: m_fs(fs)
{
}
void unblock_completed(u16);
void unblock_all();
void try_unblock(Blocker&);
protected:
virtual bool should_add_blocker(Thread::Blocker&, void*) override;
private:
Plan9FS& m_fs;
mutable SpinLock<u8> m_lock;
};
struct ReceiveCompletion : public RefCounted<ReceiveCompletion> {
mutable SpinLock<u8> lock;
bool completed { false };
const u16 tag;
OwnPtr<Message> message;
KResult result { KSuccess };
ReceiveCompletion(u16 tag);
~ReceiveCompletion();
};
class Blocker final : public Thread::Blocker {
public:
Blocker(ReceiveCompletion& completion)
: m_completion(completion)
Blocker(Plan9FS& fs, Message& message, NonnullRefPtr<ReceiveCompletion> completion)
: m_fs(fs)
, m_message(message)
, m_completion(move(completion))
{
set_block_condition(fs.m_completion_blocker);
}
virtual bool should_unblock(Thread&) override;
virtual const char* state_string() const override { return "Waiting"; }
virtual Type blocker_type() const override { return Type::Plan9FS; }
virtual void not_blocking(bool) override;
const NonnullRefPtr<ReceiveCompletion>& completion() const { return m_completion; }
u16 tag() const { return m_completion->tag; }
bool is_completed() const;
bool unblock()
{
unblock_from_blocker();
return true;
}
bool unblock(u16 tag);
private:
ReceiveCompletion& m_completion;
Plan9FS& m_fs;
Message& m_message;
NonnullRefPtr<ReceiveCompletion> m_completion;
bool m_did_unblock { false };
};
friend class Blocker;
virtual const char* class_name() const override { return "Plan9FS"; }
KResult post_message(Message&);
bool is_complete(const ReceiveCompletion&);
KResult post_message(Message&, RefPtr<ReceiveCompletion>);
KResult do_read(u8* buffer, size_t);
KResult read_and_dispatch_one_message();
KResult wait_for_specific_message(u16 tag, Message& out_message);
KResult post_message_and_wait_for_a_reply(Message&, bool auto_convert_error_reply_to_error = true);
KResult post_message_and_wait_for_a_reply(Message&);
KResult post_message_and_explicitly_ignore_reply(Message&);
ProtocolVersion parse_protocol_version(const StringView&) const;
ssize_t adjust_buffer_size(ssize_t size) const;
void thread_main();
void ensure_thread();
RefPtr<Plan9FSInode> m_root_inode;
Atomic<u16> m_next_tag { (u16)-1 };
Atomic<u32> m_next_fid { 1 };
@ -108,9 +156,13 @@ private:
size_t m_max_message_size { 4 * KiB };
Lock m_send_lock { "Plan9FS send" };
Atomic<bool> m_someone_is_reading { false };
HashMap<u16, ReceiveCompletion*> m_completions;
HashTable<u16> m_tags_to_ignore;
Plan9FSBlockCondition m_completion_blocker;
HashMap<u16, NonnullRefPtr<ReceiveCompletion>> m_completions;
SpinLock<u8> m_thread_lock;
RefPtr<Thread> m_thread;
Atomic<bool> m_thread_running { false };
Atomic<bool> m_thread_shutdown { false };
};
class Plan9FSInode final : public Inode {