mirror of
https://github.com/RGBCube/serenity
synced 2025-05-31 18:18:12 +00:00
AudioServer: Port to the new generated IPC mechanism
Fork the IPC Connection classes into Server:: and Client::ConnectionNG. The new IPC messages are serialized very snugly instead of using the same generic data structure for all messages. Remove ASAPI.h since we now generate all of it from AudioServer.ipc :^)
This commit is contained in:
parent
3519b6c201
commit
8e684f0959
14 changed files with 327 additions and 329 deletions
|
@ -1,12 +1,11 @@
|
|||
#pragma once
|
||||
|
||||
#include <LibAudio/ASAPI.h>
|
||||
#include <LibCore/CEvent.h>
|
||||
#include <LibCore/CEventLoop.h>
|
||||
#include <LibCore/CLocalSocket.h>
|
||||
#include <LibCore/CNotifier.h>
|
||||
#include <LibCore/CSyscallUtils.h>
|
||||
|
||||
#include <LibIPC/IMessage.h>
|
||||
#include <stdio.h>
|
||||
#include <sys/select.h>
|
||||
#include <sys/socket.h>
|
||||
|
@ -236,5 +235,148 @@ namespace Client {
|
|||
int m_my_client_id { -1 };
|
||||
};
|
||||
|
||||
template<typename Endpoint>
|
||||
class ConnectionNG : public CObject {
|
||||
C_OBJECT(Connection)
|
||||
public:
|
||||
ConnectionNG(const StringView& address)
|
||||
: m_notifier(CNotifier(m_connection.fd(), CNotifier::Read))
|
||||
{
|
||||
// We want to rate-limit our clients
|
||||
m_connection.set_blocking(true);
|
||||
m_notifier.on_ready_to_read = [this] {
|
||||
drain_messages_from_server();
|
||||
CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
||||
};
|
||||
|
||||
int retries = 1000;
|
||||
while (retries) {
|
||||
if (m_connection.connect(CSocketAddress::local(address))) {
|
||||
break;
|
||||
}
|
||||
|
||||
dbgprintf("Client::Connection: connect failed: %d, %s\n", errno, strerror(errno));
|
||||
sleep(1);
|
||||
--retries;
|
||||
}
|
||||
ASSERT(m_connection.is_connected());
|
||||
}
|
||||
|
||||
virtual void handshake() = 0;
|
||||
|
||||
virtual void event(CEvent& event) override
|
||||
{
|
||||
if (event.type() == Event::PostProcess) {
|
||||
postprocess_messages(m_unprocessed_messages);
|
||||
} else {
|
||||
CObject::event(event);
|
||||
}
|
||||
}
|
||||
|
||||
void set_server_pid(pid_t pid) { m_server_pid = pid; }
|
||||
pid_t server_pid() const { return m_server_pid; }
|
||||
void set_my_client_id(int id) { m_my_client_id = id; }
|
||||
int my_client_id() const { return m_my_client_id; }
|
||||
|
||||
template<typename MessageType>
|
||||
OwnPtr<MessageType> wait_for_specific_message()
|
||||
{
|
||||
// Double check we don't already have the event waiting for us.
|
||||
// Otherwise we might end up blocked for a while for no reason.
|
||||
for (ssize_t i = 0; i < m_unprocessed_messages.size(); ++i) {
|
||||
if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) {
|
||||
auto message = move(m_unprocessed_messages[i]);
|
||||
m_unprocessed_messages.remove(i);
|
||||
CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
||||
return message;
|
||||
}
|
||||
}
|
||||
for (;;) {
|
||||
fd_set rfds;
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(m_connection.fd(), &rfds);
|
||||
int rc = CSyscallUtils::safe_syscall(select, m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr);
|
||||
if (rc < 0) {
|
||||
perror("select");
|
||||
}
|
||||
ASSERT(rc > 0);
|
||||
ASSERT(FD_ISSET(m_connection.fd(), &rfds));
|
||||
bool success = drain_messages_from_server();
|
||||
if (!success)
|
||||
return nullptr;
|
||||
for (ssize_t i = 0; i < m_unprocessed_messages.size(); ++i) {
|
||||
if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) {
|
||||
auto message = move(m_unprocessed_messages[i]);
|
||||
m_unprocessed_messages.remove(i);
|
||||
CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool post_message_to_server(const IMessage& message)
|
||||
{
|
||||
auto buffer = message.encode();
|
||||
int nwritten = write(m_connection.fd(), buffer.data(), (size_t)buffer.size());
|
||||
if (nwritten < 0) {
|
||||
perror("write");
|
||||
ASSERT_NOT_REACHED();
|
||||
return false;
|
||||
}
|
||||
ASSERT(nwritten == buffer.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
template<typename RequestType, typename... Args>
|
||||
OwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
|
||||
{
|
||||
bool success = post_message_to_server(RequestType(forward<Args>(args)...));
|
||||
ASSERT(success);
|
||||
auto response = wait_for_specific_message<typename RequestType::ResponseType>();
|
||||
ASSERT(response);
|
||||
return response;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void postprocess_messages(Vector<OwnPtr<IMessage>>& new_bundles)
|
||||
{
|
||||
new_bundles.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
bool drain_messages_from_server()
|
||||
{
|
||||
for (;;) {
|
||||
u8 buffer[4096];
|
||||
ssize_t nread = recv(m_connection.fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
|
||||
if (nread < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
return true;
|
||||
}
|
||||
perror("read");
|
||||
exit(1);
|
||||
return false;
|
||||
}
|
||||
if (nread == 0) {
|
||||
dbg() << "EOF on IPC fd";
|
||||
exit(1);
|
||||
return false;
|
||||
}
|
||||
|
||||
auto message = Endpoint::decode_message(ByteBuffer::wrap(buffer, sizeof(buffer)));
|
||||
ASSERT(message);
|
||||
|
||||
m_unprocessed_messages.append(move(message));
|
||||
}
|
||||
}
|
||||
|
||||
CLocalSocket m_connection;
|
||||
CNotifier m_notifier;
|
||||
Vector<OwnPtr<IMessage>> m_unprocessed_messages;
|
||||
int m_server_pid { -1 };
|
||||
int m_my_client_id { -1 };
|
||||
};
|
||||
|
||||
} // Client
|
||||
} // IPC
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue