mirror of
https://github.com/RGBCube/serenity
synced 2025-07-27 06:57:45 +00:00
LibIPC: Don't handle incoming messages right away when draining
When draining the socket in IServerConnection, we would previously handle each incoming (local endpoint) message as it came in. This would cause unexpected things to happen while blocked waiting for a synchronous response. That's definitely not what we want, so this patch puts all of the incoming messages in a queue and does a separate pass over the queue to handle everything in order.
This commit is contained in:
parent
4a37bec27c
commit
86504f4461
1 changed files with 12 additions and 1 deletions
|
@ -1,5 +1,6 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <AK/NonnullOwnPtrVector.h>
|
||||||
#include <LibCore/CEvent.h>
|
#include <LibCore/CEvent.h>
|
||||||
#include <LibCore/CEventLoop.h>
|
#include <LibCore/CEventLoop.h>
|
||||||
#include <LibCore/CLocalSocket.h>
|
#include <LibCore/CLocalSocket.h>
|
||||||
|
@ -25,6 +26,7 @@ public:
|
||||||
m_connection->set_blocking(true);
|
m_connection->set_blocking(true);
|
||||||
m_notifier->on_ready_to_read = [this] {
|
m_notifier->on_ready_to_read = [this] {
|
||||||
drain_messages_from_server();
|
drain_messages_from_server();
|
||||||
|
handle_messages();
|
||||||
};
|
};
|
||||||
|
|
||||||
int retries = 100000;
|
int retries = 100000;
|
||||||
|
@ -131,7 +133,7 @@ private:
|
||||||
for (size_t index = 0; index < (size_t)bytes.size(); index += decoded_bytes) {
|
for (size_t index = 0; index < (size_t)bytes.size(); index += decoded_bytes) {
|
||||||
auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
|
auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
|
||||||
if (auto message = LocalEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
|
if (auto message = LocalEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
|
||||||
m_local_endpoint.handle(*message);
|
m_unprocessed_messages.append(move(message));
|
||||||
} else if (auto message = PeerEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
|
} else if (auto message = PeerEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
|
||||||
m_unprocessed_messages.append(move(message));
|
m_unprocessed_messages.append(move(message));
|
||||||
} else {
|
} else {
|
||||||
|
@ -142,6 +144,15 @@ private:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void handle_messages()
|
||||||
|
{
|
||||||
|
auto messages = move(m_unprocessed_messages);
|
||||||
|
for (auto& message : messages) {
|
||||||
|
if (message->endpoint_magic() == LocalEndpoint::static_magic())
|
||||||
|
m_local_endpoint.handle(*message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LocalEndpoint& m_local_endpoint;
|
LocalEndpoint& m_local_endpoint;
|
||||||
RefPtr<CLocalSocket> m_connection;
|
RefPtr<CLocalSocket> m_connection;
|
||||||
RefPtr<CNotifier> m_notifier;
|
RefPtr<CNotifier> m_notifier;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue