mirror of
https://github.com/RGBCube/serenity
synced 2025-07-28 18:57:36 +00:00
LibCore+Userland: Convert TCPServer to use the Serenity Stream API
This is intended as a real-usecase test of the Serenity Stream API, and seemed like a good candidate due to its low amount of users.
This commit is contained in:
parent
2341b0159a
commit
dfdb52efa7
11 changed files with 263 additions and 124 deletions
|
@ -5,34 +5,52 @@
|
|||
*/
|
||||
|
||||
#include "Client.h"
|
||||
#include "LibCore/EventLoop.h"
|
||||
|
||||
Client::Client(int id, RefPtr<Core::TCPSocket> socket)
|
||||
Client::Client(int id, Core::Stream::TCPSocket socket)
|
||||
: m_id(id)
|
||||
, m_socket(move(socket))
|
||||
{
|
||||
m_socket->on_ready_to_read = [this] { drain_socket(); };
|
||||
m_socket.on_ready_to_read = [this] {
|
||||
if (m_socket.is_eof())
|
||||
return;
|
||||
|
||||
auto result = drain_socket();
|
||||
if (result.is_error()) {
|
||||
dbgln("Failed while trying to drain the socket: {}", result.error());
|
||||
Core::deferred_invoke([this, strong_this = NonnullRefPtr(*this)] { quit(); });
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void Client::drain_socket()
|
||||
ErrorOr<void> Client::drain_socket()
|
||||
{
|
||||
NonnullRefPtr<Client> protect(*this);
|
||||
while (m_socket->can_read()) {
|
||||
auto buf = m_socket->read(1024);
|
||||
|
||||
dbgln("Read {} bytes.", buf.size());
|
||||
auto maybe_buffer = ByteBuffer::create_uninitialized(1024);
|
||||
if (!maybe_buffer.has_value())
|
||||
return ENOMEM;
|
||||
auto buffer = maybe_buffer.release_value();
|
||||
|
||||
if (m_socket->eof()) {
|
||||
quit();
|
||||
while (TRY(m_socket.can_read_without_blocking())) {
|
||||
auto nread = TRY(m_socket.read(buffer));
|
||||
|
||||
dbgln("Read {} bytes.", nread);
|
||||
|
||||
if (m_socket.is_eof()) {
|
||||
Core::deferred_invoke([this, strong_this = NonnullRefPtr(*this)] { quit(); });
|
||||
break;
|
||||
}
|
||||
|
||||
m_socket->write(buf);
|
||||
TRY(m_socket.write({ buffer.data(), nread }));
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
void Client::quit()
|
||||
{
|
||||
m_socket->close();
|
||||
m_socket.close();
|
||||
if (on_exit)
|
||||
on_exit();
|
||||
}
|
||||
|
|
|
@ -6,11 +6,11 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <LibCore/TCPSocket.h>
|
||||
#include <LibCore/Stream.h>
|
||||
|
||||
class Client : public RefCounted<Client> {
|
||||
public:
|
||||
static NonnullRefPtr<Client> create(int id, RefPtr<Core::TCPSocket> socket)
|
||||
static NonnullRefPtr<Client> create(int id, Core::Stream::TCPSocket socket)
|
||||
{
|
||||
return adopt_ref(*new Client(id, move(socket)));
|
||||
}
|
||||
|
@ -18,12 +18,12 @@ public:
|
|||
Function<void()> on_exit;
|
||||
|
||||
protected:
|
||||
Client(int id, RefPtr<Core::TCPSocket> socket);
|
||||
Client(int id, Core::Stream::TCPSocket socket);
|
||||
|
||||
void drain_socket();
|
||||
ErrorOr<void> drain_socket();
|
||||
void quit();
|
||||
|
||||
private:
|
||||
int m_id { 0 };
|
||||
RefPtr<Core::TCPSocket> m_socket;
|
||||
Core::Stream::TCPSocket m_socket;
|
||||
};
|
||||
|
|
|
@ -52,15 +52,15 @@ int main(int argc, char** argv)
|
|||
server->on_ready_to_accept = [&next_id, &clients, &server] {
|
||||
int id = next_id++;
|
||||
|
||||
auto client_socket = server->accept();
|
||||
if (!client_socket) {
|
||||
perror("accept");
|
||||
auto maybe_client_socket = server->accept();
|
||||
if (maybe_client_socket.is_error()) {
|
||||
warnln("accept: {}", maybe_client_socket.error());
|
||||
return;
|
||||
}
|
||||
|
||||
outln("Client {} connected", id);
|
||||
|
||||
auto client = Client::create(id, move(client_socket));
|
||||
auto client = Client::create(id, maybe_client_socket.release_value());
|
||||
client->on_exit = [&clients, id] {
|
||||
Core::deferred_invoke([&clients, id] {
|
||||
clients.remove(id);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue