1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-07-27 18:27:35 +00:00

LibCore+LibIPC+Everywhere: Return Stream::LocalSocket from LocalServer

This change unfortunately cannot be atomically made without a single
commit changing everything.

Most of the important changes are in LibIPC/Connection.cpp,
LibIPC/ServerConnection.cpp and LibCore/LocalServer.cpp.

The notable changes are:
- IPCCompiler now generates the decode and decode_message functions such
  that they take a Core::Stream::LocalSocket instead of the socket fd.
- IPC::Decoder now uses the receive_fd method of LocalSocket instead of
  doing system calls directly on the fd.
- IPC::ConnectionBase and related classes now use the Stream API
  functions.
- IPC::ServerConnection no longer constructs the socket itself; instead,
  a convenience macro, IPC_CLIENT_CONNECTION, is used in place of
  C_OBJECT and will generate a static try_create factory function for
  the ServerConnection subclass. The subclass is now responsible for
  passing the socket constructed in this function to its
  ServerConnection base; the socket is passed as the first argument to
  the constructor (as a NonnullOwnPtr<Core::Stream::LocalServer>) before
  any other arguments.
- The functionality regarding taking over sockets from SystemServer has
  been moved to LibIPC/SystemServerTakeover.cpp. The Core::LocalSocket
  implementation of this functionality hasn't been deleted due to my
  intention of removing this class in the near future and to reduce
  noise on this (already quite noisy) PR.
This commit is contained in:
sin-ack 2022-01-14 13:12:49 +00:00 committed by Ali Mohammad Pur
parent 4cad0dd74c
commit 2e1bbcb0fa
94 changed files with 378 additions and 252 deletions

View file

@ -4,6 +4,7 @@ set(SOURCES
Encoder.cpp
Message.cpp
Stub.cpp
SystemServerTakeover.cpp
)
serenity_lib(LibIPC ipc)

View file

@ -24,12 +24,12 @@ public:
using ServerStub = typename ServerEndpoint::Stub;
using IPCProxy = typename ClientEndpoint::template Proxy<ServerEndpoint>;
ClientConnection(ServerStub& stub, NonnullRefPtr<Core::LocalSocket> socket, int client_id)
ClientConnection(ServerStub& stub, NonnullOwnPtr<Core::Stream::LocalSocket> socket, int client_id)
: IPC::Connection<ServerEndpoint, ClientEndpoint>(stub, move(socket))
, ClientEndpoint::template Proxy<ServerEndpoint>(*this, {})
, m_client_id(client_id)
{
VERIFY(this->socket().is_connected());
VERIFY(this->socket().is_open());
this->socket().on_ready_to_read = [this] {
// FIXME: Do something about errors.
(void)this->drain_messages_from_peer();

View file

@ -11,10 +11,9 @@
namespace IPC {
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullRefPtr<Core::LocalSocket> socket, u32 local_endpoint_magic)
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullOwnPtr<Core::Stream::LocalSocket> socket, u32 local_endpoint_magic)
: m_local_stub(local_stub)
, m_socket(move(socket))
, m_notifier(Core::Notifier::construct(m_socket->fd(), Core::Notifier::Read, this))
, m_local_endpoint_magic(local_endpoint_magic)
{
m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
@ -42,7 +41,7 @@ ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
#ifdef __serenity__
for (auto& fd : buffer.fds) {
if (auto result = Core::System::sendfd(m_socket->fd(), fd.value()); result.is_error()) {
if (auto result = m_socket->send_fd(fd.value()); result.is_error()) {
dbgln("{}", result.error());
shutdown();
return result;
@ -53,23 +52,29 @@ ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
warnln("fd passing is not supported on this platform, sorry :(");
#endif
size_t total_nwritten = 0;
while (total_nwritten < buffer.data.size()) {
auto nwritten = write(m_socket->fd(), buffer.data.data() + total_nwritten, buffer.data.size() - total_nwritten);
if (nwritten < 0) {
switch (errno) {
case EPIPE:
shutdown();
return Error::from_string_literal("IPC::Connection::post_message: Disconnected from peer"sv);
case EAGAIN:
shutdown();
return Error::from_string_literal("IPC::Connection::post_message: Peer buffer overflowed"sv);
default:
shutdown();
return Error::from_syscall("IPC::Connection::post_message write"sv, -errno);
ReadonlyBytes bytes_to_write { buffer.data.span() };
while (!bytes_to_write.is_empty()) {
auto maybe_nwritten = m_socket->write(bytes_to_write);
if (maybe_nwritten.is_error()) {
auto error = maybe_nwritten.release_error();
if (error.is_errno()) {
switch (error.code()) {
case EPIPE:
shutdown();
return Error::from_string_literal("IPC::Connection::post_message: Disconnected from peer"sv);
case EAGAIN:
shutdown();
return Error::from_string_literal("IPC::Connection::post_message: Peer buffer overflowed"sv);
default:
shutdown();
return Error::from_syscall("IPC::Connection::post_message write"sv, -error.code());
}
} else {
return error;
}
}
total_nwritten += nwritten;
bytes_to_write = bytes_to_write.slice(maybe_nwritten.value());
}
m_responsiveness_timer->start();
@ -78,7 +83,6 @@ ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
void ConnectionBase::shutdown()
{
m_notifier->close();
m_socket->close();
die();
}
@ -99,21 +103,14 @@ void ConnectionBase::handle_messages()
void ConnectionBase::wait_for_socket_to_become_readable()
{
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(m_socket->fd(), &read_fds);
for (;;) {
if (auto rc = select(m_socket->fd() + 1, &read_fds, nullptr, nullptr, nullptr); rc < 0) {
if (errno == EINTR)
continue;
perror("wait_for_specific_endpoint_message: select");
VERIFY_NOT_REACHED();
} else {
VERIFY(rc > 0);
VERIFY(FD_ISSET(m_socket->fd(), &read_fds));
break;
}
auto maybe_did_become_readable = m_socket->can_read_without_blocking(-1);
if (maybe_did_become_readable.is_error()) {
dbgln("ConnectionBase::wait_for_socket_to_become_readable: {}", maybe_did_become_readable.error());
warnln("ConnectionBase::wait_for_socket_to_become_readable: {}", maybe_did_become_readable.error());
VERIFY_NOT_REACHED();
}
VERIFY(maybe_did_become_readable.value());
}
ErrorOr<Vector<u8>> ConnectionBase::read_as_much_as_possible_from_socket_without_blocking()
@ -125,15 +122,21 @@ ErrorOr<Vector<u8>> ConnectionBase::read_as_much_as_possible_from_socket_without
m_unprocessed_bytes.clear();
}
u8 buffer[4096];
while (m_socket->is_open()) {
u8 buffer[4096];
ssize_t nread = recv(m_socket->fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
if (nread < 0) {
if (errno == EAGAIN)
auto maybe_nread = m_socket->read_without_waiting({ buffer, 4096 });
if (maybe_nread.is_error()) {
auto error = maybe_nread.release_error();
if (error.is_syscall() && error.code() == EAGAIN) {
break;
perror("recv");
exit(1);
}
dbgln("ConnectionBase::read_as_much_as_possible_from_socket_without_blocking: {}", error);
warnln("ConnectionBase::read_as_much_as_possible_from_socket_without_blocking: {}", error);
VERIFY_NOT_REACHED();
}
auto nread = maybe_nread.release_value();
if (nread == 0) {
if (bytes.is_empty()) {
deferred_invoke([this] { shutdown(); });
@ -141,6 +144,7 @@ ErrorOr<Vector<u8>> ConnectionBase::read_as_much_as_possible_from_socket_without
}
break;
}
bytes.append(buffer, nread);
}

View file

@ -11,8 +11,8 @@
#include <AK/Try.h>
#include <LibCore/Event.h>
#include <LibCore/EventLoop.h>
#include <LibCore/LocalSocket.h>
#include <LibCore/Notifier.h>
#include <LibCore/Stream.h>
#include <LibCore/Timer.h>
#include <LibIPC/Forward.h>
#include <LibIPC/Message.h>
@ -39,9 +39,9 @@ public:
virtual void die() { }
protected:
explicit ConnectionBase(IPC::Stub&, NonnullRefPtr<Core::LocalSocket>, u32 local_endpoint_magic);
explicit ConnectionBase(IPC::Stub&, NonnullOwnPtr<Core::Stream::LocalSocket>, u32 local_endpoint_magic);
Core::LocalSocket& socket() { return *m_socket; }
Core::Stream::LocalSocket& socket() { return *m_socket; }
virtual void may_have_become_unresponsive() { }
virtual void did_become_responsive() { }
@ -57,10 +57,9 @@ protected:
IPC::Stub& m_local_stub;
NonnullRefPtr<Core::LocalSocket> m_socket;
NonnullOwnPtr<Core::Stream::LocalSocket> m_socket;
RefPtr<Core::Timer> m_responsiveness_timer;
RefPtr<Core::Notifier> m_notifier;
NonnullOwnPtrVector<Message> m_unprocessed_messages;
ByteBuffer m_unprocessed_bytes;
@ -70,10 +69,10 @@ protected:
template<typename LocalEndpoint, typename PeerEndpoint>
class Connection : public ConnectionBase {
public:
Connection(IPC::Stub& local_stub, NonnullRefPtr<Core::LocalSocket> socket)
Connection(IPC::Stub& local_stub, NonnullOwnPtr<Core::Stream::LocalSocket> socket)
: ConnectionBase(local_stub, move(socket), LocalEndpoint::static_magic())
{
m_notifier->on_ready_to_read = [this] {
m_socket->on_ready_to_read = [this] {
NonnullRefPtr protect = *this;
// FIXME: Do something about errors.
(void)drain_messages_from_peer();
@ -122,9 +121,9 @@ protected:
break;
index += sizeof(message_size);
auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
if (auto message = LocalEndpoint::decode_message(remaining_bytes, m_socket->fd())) {
if (auto message = LocalEndpoint::decode_message(remaining_bytes, *m_socket)) {
m_unprocessed_messages.append(message.release_nonnull());
} else if (auto message = PeerEndpoint::decode_message(remaining_bytes, m_socket->fd())) {
} else if (auto message = PeerEndpoint::decode_message(remaining_bytes, *m_socket)) {
m_unprocessed_messages.append(message.release_nonnull());
} else {
dbgln("Failed to parse a message");

View file

@ -162,14 +162,9 @@ ErrorOr<void> Decoder::decode(Dictionary& dictionary)
ErrorOr<void> Decoder::decode([[maybe_unused]] File& file)
{
#ifdef __serenity__
int fd = TRY(Core::System::recvfd(m_sockfd, O_CLOEXEC));
int fd = TRY(m_socket.receive_fd(O_CLOEXEC));
file = File(fd, File::ConstructWithReceivedFileDescriptor);
return {};
#else
[[maybe_unused]] auto fd = m_sockfd;
return Error::from_string_literal("File descriptor passing not supported on this platform");
#endif
}
ErrorOr<void> decode(Decoder& decoder, Core::AnonymousBuffer& buffer)

View file

@ -11,6 +11,7 @@
#include <AK/NumericLimits.h>
#include <AK/StdLibExtras.h>
#include <AK/String.h>
#include <LibCore/Stream.h>
#include <LibIPC/Forward.h>
#include <LibIPC/Message.h>
@ -25,9 +26,9 @@ inline ErrorOr<void> decode(Decoder&, T&)
class Decoder {
public:
Decoder(InputMemoryStream& stream, int sockfd)
Decoder(InputMemoryStream& stream, Core::Stream::LocalSocket& socket)
: m_stream(stream)
, m_sockfd(sockfd)
, m_socket(socket)
{
}
@ -115,7 +116,7 @@ public:
private:
InputMemoryStream& m_stream;
int m_sockfd { -1 };
Core::Stream::LocalSocket& m_socket;
};
}

View file

@ -6,10 +6,24 @@
#pragma once
#include <LibCore/Stream.h>
#include <LibIPC/Connection.h>
namespace IPC {
#define IPC_CLIENT_CONNECTION(klass, socket_path) \
C_OBJECT_ABSTRACT(klass) \
public: \
template<typename Klass = klass, class... Args> \
static ErrorOr<NonnullRefPtr<klass>> try_create(Args&&... args) \
{ \
auto socket = TRY(Core::Stream::LocalSocket::connect(socket_path)); \
/* We want to rate-limit our clients */ \
TRY(socket->set_blocking(true)); \
\
return adopt_nonnull_ref_or_enomem(new (nothrow) Klass(move(socket), forward<Args>(args)...)); \
}
template<typename ClientEndpoint, typename ServerEndpoint>
class ServerConnection : public IPC::Connection<ClientEndpoint, ServerEndpoint>
, public ClientEndpoint::Stub
@ -18,19 +32,10 @@ public:
using ClientStub = typename ClientEndpoint::Stub;
using IPCProxy = typename ServerEndpoint::template Proxy<ClientEndpoint>;
ServerConnection(ClientStub& local_endpoint, StringView address)
: Connection<ClientEndpoint, ServerEndpoint>(local_endpoint, Core::LocalSocket::construct())
ServerConnection(ClientStub& local_endpoint, NonnullOwnPtr<Core::Stream::LocalSocket> socket)
: Connection<ClientEndpoint, ServerEndpoint>(local_endpoint, move(socket))
, ServerEndpoint::template Proxy<ClientEndpoint>(*this, {})
{
// We want to rate-limit our clients
this->socket().set_blocking(true);
if (!this->socket().connect(Core::SocketAddress::local(address))) {
perror("connect");
VERIFY_NOT_REACHED();
}
VERIFY(this->socket().is_connected());
}
virtual void die() override

View file

@ -6,14 +6,16 @@
#pragma once
#include <LibCore/System.h>
#include <LibIPC/ClientConnection.h>
#include <LibIPC/SystemServerTakeover.h>
namespace IPC {
template<typename ClientConnectionType>
ErrorOr<NonnullRefPtr<ClientConnectionType>> take_over_accepted_client_from_system_server()
{
auto socket = TRY(Core::LocalSocket::take_over_accepted_socket_from_system_server());
auto socket = TRY(take_over_accepted_socket_from_system_server());
return IPC::new_client_connection<ClientConnectionType>(move(socket));
}

View file

@ -0,0 +1,65 @@
/*
* Copyright (c) 2022, sin-ack <sin-ack@protonmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include "SystemServerTakeover.h"
#include <LibCore/System.h>
HashMap<String, int> s_overtaken_sockets {};
bool s_overtaken_sockets_parsed { false };
void parse_sockets_from_system_server()
{
VERIFY(!s_overtaken_sockets_parsed);
constexpr auto socket_takeover = "SOCKET_TAKEOVER";
const char* sockets = getenv(socket_takeover);
if (!sockets) {
s_overtaken_sockets_parsed = true;
return;
}
for (auto& socket : StringView(sockets).split_view(' ')) {
auto params = socket.split_view(':');
s_overtaken_sockets.set(params[0].to_string(), strtol(params[1].to_string().characters(), nullptr, 10));
}
s_overtaken_sockets_parsed = true;
// We wouldn't want our children to think we're passing
// them a socket either, so unset the env variable.
unsetenv(socket_takeover);
}
ErrorOr<NonnullOwnPtr<Core::Stream::LocalSocket>> take_over_accepted_socket_from_system_server(String const& socket_path)
{
if (!s_overtaken_sockets_parsed)
parse_sockets_from_system_server();
int fd;
if (socket_path.is_null()) {
// We want the first (and only) socket.
VERIFY(s_overtaken_sockets.size() == 1);
fd = s_overtaken_sockets.begin()->value;
} else {
auto it = s_overtaken_sockets.find(socket_path);
if (it == s_overtaken_sockets.end())
return Error::from_string_literal("Non-existent socket requested"sv);
fd = it->value;
}
// Sanity check: it has to be a socket.
auto stat = TRY(Core::System::fstat(fd));
if (!S_ISSOCK(stat.st_mode))
return Error::from_string_literal("The fd we got from SystemServer is not a socket"sv);
auto socket = TRY(Core::Stream::LocalSocket::adopt_fd(fd));
// It had to be !CLOEXEC for obvious reasons, but we
// don't need it to be !CLOEXEC anymore, so set the
// CLOEXEC flag now.
TRY(socket->set_close_on_exec(true));
return socket;
}

View file

@ -0,0 +1,12 @@
/*
* Copyright (c) 2022, sin-ack <sin-ack@protonmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <LibCore/Stream.h>
void parse_sockets_from_system_server();
ErrorOr<NonnullOwnPtr<Core::Stream::LocalSocket>> take_over_accepted_socket_from_system_server(String const& socket_path = {});