1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-07-26 07:17:35 +00:00

LibCore: Have IPC server connections queue up unsendable messages

If an IPC client is giving us EAGAIN when trying to send him a message,
we now queue up the messages inside the CoreIPCServer::Connection and
will retry flushing them on next post/receive.

This prevents WindowServer from freezing up when one of its clients is
not taking care of its incoming messages.
This commit is contained in:
Andreas Kling 2019-11-03 12:36:35 +01:00
parent 8c45891c80
commit 31c1b8ec3e

View file

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <AK/Queue.h>
#include <LibCore/CEvent.h> #include <LibCore/CEvent.h>
#include <LibCore/CEventLoop.h> #include <LibCore/CEventLoop.h>
#include <LibCore/CIODevice.h> #include <LibCore/CIODevice.h>
@ -70,7 +71,10 @@ namespace Server {
, m_client_id(client_id) , m_client_id(client_id)
{ {
add_child(socket); add_child(socket);
m_socket->on_ready_to_read = [this] { drain_client(); }; m_socket->on_ready_to_read = [this] {
drain_client();
flush_outgoing_messages();
};
#if defined(CIPC_DEBUG) #if defined(CIPC_DEBUG)
dbg() << "S: Created new Connection " << fd << client_id << " and said hello"; dbg() << "S: Created new Connection " << fd << client_id << " and said hello";
#endif #endif
@ -89,9 +93,16 @@ namespace Server {
#if defined(CIPC_DEBUG) #if defined(CIPC_DEBUG)
dbg() << "S: -> C " << int(message.type) << " extra " << extra_data.size(); dbg() << "S: -> C " << int(message.type) << " extra " << extra_data.size();
#endif #endif
if (try_send_message(message, extra_data))
return;
QueuedMessage queued_message { message, extra_data };
if (!extra_data.is_empty()) if (!extra_data.is_empty())
const_cast<ServerMessage&>(message).extra_size = extra_data.size(); queued_message.message.extra_size = extra_data.size();
m_queue.enqueue(move(queued_message));
}
bool try_send_message(const ServerMessage& message, const ByteBuffer& extra_data)
{
struct iovec iov[2]; struct iovec iov[2];
int iov_count = 1; int iov_count = 1;
@ -104,29 +115,36 @@ namespace Server {
++iov_count; ++iov_count;
} }
int nwritten = 0; int nwritten = writev(m_socket->fd(), iov, iov_count);
for (;;) { if (nwritten < 0) {
nwritten = writev(m_socket->fd(), iov, iov_count); switch (errno) {
if (nwritten < 0) { case EPIPE:
switch (errno) { dbgprintf("Connection::post_message: Disconnected from peer.\n");
case EPIPE: shutdown();
dbgprintf("Connection::post_message: Disconnected from peer.\n"); return false;
shutdown(); case EAGAIN:
return; #ifdef CIPC_DEBUG
case EAGAIN: dbg() << "EAGAIN when trying to send WindowServer message, queue size: " << m_queue.size();
// FIXME: It would be better to push these onto a queue so we can go back #endif
// to servicing other clients. return false;
sched_yield(); default:
continue; perror("Connection::post_message writev");
default: ASSERT_NOT_REACHED();
perror("Connection::post_message writev");
ASSERT_NOT_REACHED();
}
} }
break;
} }
ASSERT(nwritten == (int)(sizeof(message) + extra_data.size())); ASSERT(nwritten == (int)(sizeof(message) + extra_data.size()));
return true;
}
void flush_outgoing_messages()
{
while (!m_queue.is_empty()) {
auto& queued_message = m_queue.head();
if (!try_send_message(queued_message.message, queued_message.extra_data))
break;
m_queue.dequeue();
}
} }
void drain_client() void drain_client()
@ -209,6 +227,13 @@ namespace Server {
private: private:
RefPtr<CLocalSocket> m_socket; RefPtr<CLocalSocket> m_socket;
struct QueuedMessage {
ServerMessage message;
ByteBuffer extra_data;
};
Queue<QueuedMessage, 16> m_queue;
int m_client_id { -1 }; int m_client_id { -1 };
int m_client_pid { -1 }; int m_client_pid { -1 };
}; };