mirror of
https://github.com/RGBCube/serenity
synced 2025-07-25 17:27:35 +00:00
LibIPC: Prepend each message with its size
This makes it much simpler to determine when we've read a complete message, and will make it possible to integrate recvfd() in the future commit.
This commit is contained in:
parent
d62346c0b1
commit
fa2e3e2be4
3 changed files with 35 additions and 23 deletions
|
@ -66,7 +66,7 @@ public:
|
||||||
GenericLexer lexer { pattern };
|
GenericLexer lexer { pattern };
|
||||||
|
|
||||||
while (!lexer.is_eof()) {
|
while (!lexer.is_eof()) {
|
||||||
// FIXME: It is a bit inconvinient, that 'consume_until' also consumes the 'stop' character, this makes
|
// FIXME: It is a bit inconvenient, that 'consume_until' also consumes the 'stop' character, this makes
|
||||||
// the method less generic because there is no way to check if the 'stop' character ever appeared.
|
// the method less generic because there is no way to check if the 'stop' character ever appeared.
|
||||||
const auto consume_until_without_consuming_stop_character = [&](char stop) {
|
const auto consume_until_without_consuming_stop_character = [&](char stop) {
|
||||||
return lexer.consume_while([&](char ch) { return ch != stop; });
|
return lexer.consume_while([&](char ch) { return ch != stop; });
|
||||||
|
|
|
@ -318,9 +318,9 @@ public:
|
||||||
static i32 static_message_id() { return (int)MessageID::@message.name@; }
|
static i32 static_message_id() { return (int)MessageID::@message.name@; }
|
||||||
virtual const char* message_name() const override { return "@endpoint.name@::@message.name@"; }
|
virtual const char* message_name() const override { return "@endpoint.name@::@message.name@"; }
|
||||||
|
|
||||||
static OwnPtr<@message.name@> decode(InputMemoryStream& stream, size_t& size_in_bytes)
|
static OwnPtr<@message.name@> decode(InputMemoryStream& stream)
|
||||||
{
|
{
|
||||||
IPC::Decoder decoder {stream};
|
IPC::Decoder decoder { stream };
|
||||||
)~~~");
|
)~~~");
|
||||||
|
|
||||||
for (auto& parameter : parameters) {
|
for (auto& parameter : parameters) {
|
||||||
|
@ -359,7 +359,6 @@ public:
|
||||||
message_generator.set("message.constructor_call_parameters", builder.build());
|
message_generator.set("message.constructor_call_parameters", builder.build());
|
||||||
|
|
||||||
message_generator.append(R"~~~(
|
message_generator.append(R"~~~(
|
||||||
size_in_bytes = stream.offset();
|
|
||||||
return make<@message.name@>(@message.constructor_call_parameters@);
|
return make<@message.name@>(@message.constructor_call_parameters@);
|
||||||
}
|
}
|
||||||
)~~~");
|
)~~~");
|
||||||
|
@ -437,7 +436,7 @@ public:
|
||||||
static String static_name() { return "@endpoint.name@"; }
|
static String static_name() { return "@endpoint.name@"; }
|
||||||
virtual String name() const override { return "@endpoint.name@"; }
|
virtual String name() const override { return "@endpoint.name@"; }
|
||||||
|
|
||||||
static OwnPtr<IPC::Message> decode_message(const ByteBuffer& buffer, size_t& size_in_bytes)
|
static OwnPtr<IPC::Message> decode_message(const ByteBuffer& buffer)
|
||||||
{
|
{
|
||||||
InputMemoryStream stream { buffer };
|
InputMemoryStream stream { buffer };
|
||||||
i32 message_endpoint_magic = 0;
|
i32 message_endpoint_magic = 0;
|
||||||
|
@ -489,7 +488,7 @@ public:
|
||||||
|
|
||||||
message_generator.append(R"~~~(
|
message_generator.append(R"~~~(
|
||||||
case (int)Messages::@endpoint.name@::MessageID::@message.name@:
|
case (int)Messages::@endpoint.name@::MessageID::@message.name@:
|
||||||
message = Messages::@endpoint.name@::@message.name@::decode(stream, size_in_bytes);
|
message = Messages::@endpoint.name@::@message.name@::decode(stream);
|
||||||
break;
|
break;
|
||||||
)~~~");
|
)~~~");
|
||||||
};
|
};
|
||||||
|
|
|
@ -35,6 +35,7 @@
|
||||||
#include <LibCore/SyscallUtils.h>
|
#include <LibCore/SyscallUtils.h>
|
||||||
#include <LibCore/Timer.h>
|
#include <LibCore/Timer.h>
|
||||||
#include <LibIPC/Message.h>
|
#include <LibIPC/Message.h>
|
||||||
|
#include <stdint.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <sys/select.h>
|
#include <sys/select.h>
|
||||||
|
@ -75,10 +76,13 @@ public:
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto buffer = message.encode();
|
auto buffer = message.encode();
|
||||||
|
// Prepend the message size.
|
||||||
|
uint32_t message_size = buffer.size();
|
||||||
|
buffer.prepend(reinterpret_cast<const u8*>(&message_size), sizeof(message_size));
|
||||||
|
|
||||||
auto bytes_remaining = buffer.size();
|
size_t total_nwritten = 0;
|
||||||
while (bytes_remaining) {
|
while (total_nwritten < buffer.size()) {
|
||||||
auto nwritten = write(m_socket->fd(), buffer.data(), buffer.size());
|
auto nwritten = write(m_socket->fd(), buffer.data() + total_nwritten, buffer.size() - total_nwritten);
|
||||||
if (nwritten < 0) {
|
if (nwritten < 0) {
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case EPIPE:
|
case EPIPE:
|
||||||
|
@ -95,7 +99,7 @@ public:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bytes_remaining -= nwritten;
|
total_nwritten += nwritten;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_responsiveness_timer->start();
|
m_responsiveness_timer->start();
|
||||||
|
@ -190,25 +194,34 @@ protected:
|
||||||
did_become_responsive();
|
did_become_responsive();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t decoded_bytes = 0;
|
size_t index = 0;
|
||||||
for (size_t index = 0; index < bytes.size(); index += decoded_bytes) {
|
uint32_t message_size = 0;
|
||||||
|
for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
|
||||||
|
message_size = *reinterpret_cast<uint32_t*>(bytes.data() + index);
|
||||||
|
if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
|
||||||
|
break;
|
||||||
|
index += sizeof(message_size);
|
||||||
auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
|
auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
|
||||||
if (auto message = LocalEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
|
if (auto message = LocalEndpoint::decode_message(remaining_bytes)) {
|
||||||
m_unprocessed_messages.append(message.release_nonnull());
|
m_unprocessed_messages.append(message.release_nonnull());
|
||||||
} else if (auto message = PeerEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
|
} else if (auto message = PeerEndpoint::decode_message(remaining_bytes)) {
|
||||||
m_unprocessed_messages.append(message.release_nonnull());
|
m_unprocessed_messages.append(message.release_nonnull());
|
||||||
} else {
|
} else {
|
||||||
// Sometimes we might receive a partial message. That's okay, just stash away
|
dbgln("Failed to parse a message");
|
||||||
// the unprocessed bytes and we'll prepend them to the next incoming message
|
|
||||||
// in the next run of this function.
|
|
||||||
if (!m_unprocessed_bytes.is_empty()) {
|
|
||||||
dbg() << *this << "::drain_messages_from_peer: Already have unprocessed bytes";
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
m_unprocessed_bytes = remaining_bytes.isolated_copy();
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
ASSERT(decoded_bytes);
|
}
|
||||||
|
|
||||||
|
if (index < bytes.size()) {
|
||||||
|
// Sometimes we might receive a partial message. That's okay, just stash away
|
||||||
|
// the unprocessed bytes and we'll prepend them to the next incoming message
|
||||||
|
// in the next run of this function.
|
||||||
|
auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
|
||||||
|
if (!m_unprocessed_bytes.is_empty()) {
|
||||||
|
dbg() << *this << "::drain_messages_from_peer: Already have unprocessed bytes";
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
m_unprocessed_bytes = remaining_bytes.isolated_copy();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!m_unprocessed_messages.is_empty()) {
|
if (!m_unprocessed_messages.is_empty()) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue