diff --git a/Tests/LibWeb/Text/expected/Messaging/Messaging-post-channel-over-channel.txt b/Tests/LibWeb/Text/expected/Messaging/Messaging-post-channel-over-channel.txt new file mode 100644 index 0000000000..192dd3d091 --- /dev/null +++ b/Tests/LibWeb/Text/expected/Messaging/Messaging-post-channel-over-channel.txt @@ -0,0 +1,6 @@ +Port1: "Hello" +Port1: {"foo":{}} +Port1: "DONE" +Port2: "Hello" +Port3: "Hello from the transferred port" +Port2: "DONE" diff --git a/Tests/LibWeb/Text/input/Messaging/Messaging-post-channel-over-channel.html b/Tests/LibWeb/Text/input/Messaging/Messaging-post-channel-over-channel.html new file mode 100644 index 0000000000..bc958f4566 --- /dev/null +++ b/Tests/LibWeb/Text/input/Messaging/Messaging-post-channel-over-channel.html @@ -0,0 +1,32 @@ + + diff --git a/Userland/Libraries/LibWeb/HTML/MessagePort.cpp b/Userland/Libraries/LibWeb/HTML/MessagePort.cpp index 2a0c9d7bfb..0d71a6e7e5 100644 --- a/Userland/Libraries/LibWeb/HTML/MessagePort.cpp +++ b/Userland/Libraries/LibWeb/HTML/MessagePort.cpp @@ -1,18 +1,20 @@ /* * Copyright (c) 2021, Andreas Kling + * Copyright (c) 2023, Andrew Kaster * * SPDX-License-Identifier: BSD-2-Clause */ +#include #include #include +#include +#include #include #include #include #include #include -#include -#include #include #include #include @@ -34,7 +36,10 @@ MessagePort::MessagePort(JS::Realm& realm) { } -MessagePort::~MessagePort() = default; +MessagePort::~MessagePort() +{ + disentangle(); +} void MessagePort::initialize(JS::Realm& realm) { @@ -67,6 +72,11 @@ WebIDL::ExceptionOr MessagePort::transfer_steps(HTML::TransferDataHolder& m_socket = nullptr; data_holder.fds.append(fd); data_holder.data.append(IPC_FILE_TAG); + + auto fd_passing_socket = MUST(m_fd_passing_socket->release_fd()); + m_fd_passing_socket = nullptr; + data_holder.fds.append(fd_passing_socket); + data_holder.data.append(IPC_FILE_TAG); } // 4. Otherwise, set dataHolder.[[RemotePort]] to null. @@ -91,7 +101,12 @@ WebIDL::ExceptionOr MessagePort::transfer_receiving_steps(HTML::TransferDa auto fd_tag = data_holder.data.take_first(); if (fd_tag == IPC_FILE_TAG) { auto fd = data_holder.fds.take_first(); - m_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd())); + m_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd(), Core::LocalSocket::PreventSIGPIPE::Yes)); + + fd_tag = data_holder.data.take_first(); + VERIFY(fd_tag == IPC_FILE_TAG); + fd = data_holder.fds.take_first(); + m_fd_passing_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd(), Core::LocalSocket::PreventSIGPIPE::Yes)); } else if (fd_tag != 0) { dbgln("Unexpected byte {:x} in MessagePort transfer data", fd_tag); VERIFY_NOT_REACHED(); @@ -102,10 +117,12 @@ WebIDL::ExceptionOr MessagePort::transfer_receiving_steps(HTML::TransferDa void MessagePort::disentangle() { - m_remote_port->m_remote_port = nullptr; + if (m_remote_port) + m_remote_port->m_remote_port = nullptr; m_remote_port = nullptr; m_socket = nullptr; + m_fd_passing_socket = nullptr; } // https://html.spec.whatwg.org/multipage/web-messaging.html#entangle @@ -125,17 +142,34 @@ void MessagePort::entangle_with(MessagePort& remote_port) remote_port.m_remote_port = this; m_remote_port = &remote_port; - int fds[2] = {}; - MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds)); - auto socket0 = MUST(Core::LocalSocket::adopt_fd(fds[0])); - MUST(socket0->set_blocking(false)); - MUST(socket0->set_close_on_exec(true)); - auto socket1 = MUST(Core::LocalSocket::adopt_fd(fds[1])); - MUST(socket1->set_blocking(false)); - MUST(socket1->set_close_on_exec(true)); + auto create_paired_sockets = []() -> Array, 2> { + int fds[2] = {}; + MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds)); + auto socket0 = MUST(Core::LocalSocket::adopt_fd(fds[0], Core::LocalSocket::PreventSIGPIPE::Yes)); + MUST(socket0->set_blocking(false)); + MUST(socket0->set_close_on_exec(true)); + auto socket1 = MUST(Core::LocalSocket::adopt_fd(fds[1], Core::LocalSocket::PreventSIGPIPE::Yes)); + MUST(socket1->set_blocking(false)); + MUST(socket1->set_close_on_exec(true)); - m_socket = move(socket0); - m_remote_port->m_socket = move(socket1); + return Array { move(socket0), move(socket1) }; + }; + + auto sockets = create_paired_sockets(); + m_socket = move(sockets[0]); + m_remote_port->m_socket = move(sockets[1]); + + m_socket->on_ready_to_read = [strong_this = JS::make_handle(this)]() { + strong_this->read_from_socket(); + }; + + m_remote_port->m_socket->on_ready_to_read = [remote_port = JS::make_handle(m_remote_port)]() { + remote_port->read_from_socket(); + }; + + auto fd_sockets = create_paired_sockets(); + m_fd_passing_socket = move(fd_sockets[0]); + m_remote_port->m_fd_passing_socket = move(fd_sockets[1]); } // https://html.spec.whatwg.org/multipage/web-messaging.html#dom-messageport-postmessage-options @@ -193,58 +227,167 @@ WebIDL::ExceptionOr MessagePort::message_port_post_message_steps(JS::GCPtr auto serialize_with_transfer_result = TRY(structured_serialize_with_transfer(vm, message, transfer)); // 6. If targetPort is null, or if doomed is true, then return. - if (!target_port || doomed) + // IMPLEMENTATION DEFINED: Actually check the socket here, not the target port. + // If there's no target message port in the same realm, we still want to send the message over IPC + if (!m_socket || doomed) { return {}; + } - // FIXME: 7. Add a task that runs the following steps to the port message queue of targetPort: - // FIXME: Implement this using the port message queue/unshipped port message queue concept - main_thread_event_loop().task_queue().add(HTML::Task::create(HTML::Task::Source::PostedMessage, nullptr, [target_port, serialize_with_transfer_result = move(serialize_with_transfer_result)]() mutable { - // FIXME: 1. Let finalTargetPort be the MessagePort in whose port message queue the task now finds itself. - // FIXME: NOTE: This can be different from targetPort, if targetPort itself was transferred and thus all its tasks moved along with it. - auto final_target_port = target_port; + // 7. Add a task that runs the following steps to the port message queue of targetPort: + post_port_message(move(serialize_with_transfer_result)); - // 2. Let targetRealm be finalTargetPort's relevant realm. - auto& target_realm = relevant_realm(*final_target_port); - auto& target_vm = target_realm.vm(); + return {}; +} - // 3. Let deserializeRecord be StructuredDeserializeWithTransfer(serializeWithTransferResult, targetRealm). - TemporaryExecutionContext context { relevant_settings_object(*final_target_port) }; - auto deserialize_record_or_error = structured_deserialize_with_transfer(target_vm, serialize_with_transfer_result); - if (deserialize_record_or_error.is_error()) { - // If this throws an exception, catch it, fire an event named messageerror at finalTargetPort, using MessageEvent, and then return. - auto exception = deserialize_record_or_error.release_error(); - MessageEventInit event_init {}; - final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::messageerror, event_init)); - return; +ErrorOr MessagePort::send_message_on_socket(SerializedTransferRecord const& serialize_with_transfer_result) +{ + IPC::MessageBuffer buffer; + IPC::Encoder encoder(buffer); + MUST(encoder.encode(0)); // placeholder for total size + MUST(encoder.encode(serialize_with_transfer_result)); + + u32 buffer_size = buffer.data.size() - sizeof(u32); // size of *payload* + buffer.data[0] = buffer_size & 0xFF; + buffer.data[1] = (buffer_size >> 8) & 0xFF; + buffer.data[2] = (buffer_size >> 16) & 0xFF; + buffer.data[3] = (buffer_size >> 24) & 0xFF; + + for (auto& fd : buffer.fds) { + if (auto result = m_fd_passing_socket->send_fd(fd->value()); result.is_error()) { + return Error::from_string_view("Can't send fd"sv); } - auto deserialize_record = deserialize_record_or_error.release_value(); + } - // 4. Let messageClone be deserializeRecord.[[Deserialized]]. - auto message_clone = deserialize_record.deserialized; - - // 5. Let newPorts be a new frozen array consisting of all MessagePort objects in deserializeRecord.[[TransferredValues]], if any, maintaining their relative order. - // FIXME: Use a FrozenArray - Vector> new_ports; - for (auto const& object : deserialize_record.transferred_values) { - if (is(*object)) { - new_ports.append(object); + ReadonlyBytes bytes_to_write { buffer.data.span() }; + int writes_done = 0; + size_t initial_size = bytes_to_write.size(); + while (!bytes_to_write.is_empty()) { + auto maybe_nwritten = m_socket->write_some(bytes_to_write); + writes_done++; + if (maybe_nwritten.is_error()) { + auto error = maybe_nwritten.release_error(); + if (error.is_errno()) { + // FIXME: This is a hacky way to at least not crash on large messages + // The limit of 100 writes is arbitrary, and there to prevent indefinite spinning on the EventLoop + if (error.code() == EAGAIN && writes_done < 100) { + sched_yield(); + continue; + } + switch (error.code()) { + case EPIPE: + return Error::from_string_literal("IPC::Connection::post_message: Disconnected from peer"); + case EAGAIN: + return Error::from_string_literal("IPC::Connection::post_message: Peer buffer overflowed"); + default: + return Error::from_syscall("IPC::Connection::post_message write"sv, -error.code()); + } + } else { + return error; } } - // 6. Fire an event named message at finalTargetPort, using MessageEvent, with the data attribute initialized to messageClone and the ports attribute initialized to newPorts. - MessageEventInit event_init {}; - event_init.data = message_clone; - event_init.ports = move(new_ports); - final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::message, event_init)); - })); - + bytes_to_write = bytes_to_write.slice(maybe_nwritten.value()); + } + if (writes_done > 1) { + dbgln("LibIPC::Connection FIXME Warning, needed {} writes needed to send message of size {}B, this is pretty bad, as it spins on the EventLoop", writes_done, initial_size); + } return {}; } +void MessagePort::post_port_message(SerializedTransferRecord serialize_with_transfer_result) +{ + // FIXME: Use the correct task source? + queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), [this, serialize_with_transfer_result = move(serialize_with_transfer_result)]() mutable { + if (!m_socket || !m_socket->is_open()) + return; + if (auto result = send_message_on_socket(serialize_with_transfer_result); result.is_error()) { + dbgln("Failed to post message: {}", result.error()); + disentangle(); + } + }); +} + +void MessagePort::read_from_socket() +{ + auto num_bytes_ready = MUST(m_socket->pending_bytes()); + switch (m_socket_state) { + case SocketState::Header: { + if (num_bytes_ready < sizeof(u32)) + break; + m_socket_incoming_message_size = MUST(m_socket->read_value()); + num_bytes_ready -= sizeof(u32); + m_socket_state = SocketState::Data; + } + [[fallthrough]]; + case SocketState::Data: { + if (num_bytes_ready < m_socket_incoming_message_size) + break; + + Vector data; + data.resize(m_socket_incoming_message_size, true); + MUST(m_socket->read_until_filled(data)); + + FixedMemoryStream stream { data, FixedMemoryStream::Mode::ReadOnly }; + IPC::Decoder decoder(stream, *m_fd_passing_socket); + + auto serialize_with_transfer_result = MUST(decoder.decode()); + + post_message_task_steps(serialize_with_transfer_result); + m_socket_state = SocketState::Header; + break; + } + case SocketState::Error: + VERIFY_NOT_REACHED(); + break; + } +} + +void MessagePort::post_message_task_steps(SerializedTransferRecord& serialize_with_transfer_result) +{ + // 1. Let finalTargetPort be the MessagePort in whose port message queue the task now finds itself. + // NOTE: This can be different from targetPort, if targetPort itself was transferred and thus all its tasks moved along with it. + auto* final_target_port = this; + + // 2. Let targetRealm be finalTargetPort's relevant realm. + auto& target_realm = relevant_realm(*final_target_port); + auto& target_vm = target_realm.vm(); + + // 3. Let deserializeRecord be StructuredDeserializeWithTransfer(serializeWithTransferResult, targetRealm). + TemporaryExecutionContext context { relevant_settings_object(*final_target_port) }; + auto deserialize_record_or_error = structured_deserialize_with_transfer(target_vm, serialize_with_transfer_result); + if (deserialize_record_or_error.is_error()) { + // If this throws an exception, catch it, fire an event named messageerror at finalTargetPort, using MessageEvent, and then return. + auto exception = deserialize_record_or_error.release_error(); + MessageEventInit event_init {}; + final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::messageerror, event_init)); + return; + } + auto deserialize_record = deserialize_record_or_error.release_value(); + + // 4. Let messageClone be deserializeRecord.[[Deserialized]]. + auto message_clone = deserialize_record.deserialized; + + // 5. Let newPorts be a new frozen array consisting of all MessagePort objects in deserializeRecord.[[TransferredValues]], if any, maintaining their relative order. + // FIXME: Use a FrozenArray + Vector> new_ports; + for (auto const& object : deserialize_record.transferred_values) { + if (is(*object)) { + new_ports.append(object); + } + } + + // 6. Fire an event named message at finalTargetPort, using MessageEvent, with the data attribute initialized to messageClone and the ports attribute initialized to newPorts. + MessageEventInit event_init {}; + event_init.data = message_clone; + event_init.ports = move(new_ports); + final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::message, event_init)); +} + // https://html.spec.whatwg.org/multipage/web-messaging.html#dom-messageport-start void MessagePort::start() { VERIFY(m_socket); + VERIFY(m_fd_passing_socket); // TODO: The start() method steps are to enable this's port message queue, if it is not already enabled. } diff --git a/Userland/Libraries/LibWeb/HTML/MessagePort.h b/Userland/Libraries/LibWeb/HTML/MessagePort.h index c7a69d7f6c..13a36ba460 100644 --- a/Userland/Libraries/LibWeb/HTML/MessagePort.h +++ b/Userland/Libraries/LibWeb/HTML/MessagePort.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2021, Andreas Kling + * Copyright (c) 2023, Andrew Kaster * * SPDX-License-Identifier: BSD-2-Clause */ @@ -70,6 +71,10 @@ private: void disentangle(); WebIDL::ExceptionOr message_port_post_message_steps(JS::GCPtr target_port, JS::Value message, StructuredSerializeOptions const& options); + void post_message_task_steps(SerializedTransferRecord&); + void post_port_message(SerializedTransferRecord); + ErrorOr send_message_on_socket(SerializedTransferRecord const&); + void read_from_socket(); // The HTML spec implies(!) that this is MessagePort.[[RemotePort]] JS::GCPtr m_remote_port; @@ -78,6 +83,14 @@ private: bool m_has_been_shipped { false }; OwnPtr m_socket; + OwnPtr m_fd_passing_socket; + + enum class SocketState : u8 { + Header, + Data, + Error, + } m_socket_state { SocketState::Header }; + size_t m_socket_incoming_message_size { 0 }; }; } diff --git a/Userland/Libraries/LibWeb/HTML/MessagePort.idl b/Userland/Libraries/LibWeb/HTML/MessagePort.idl index e7f5d08506..14a70e0633 100644 --- a/Userland/Libraries/LibWeb/HTML/MessagePort.idl +++ b/Userland/Libraries/LibWeb/HTML/MessagePort.idl @@ -4,7 +4,8 @@ // https://html.spec.whatwg.org/multipage/web-messaging.html#messageport [Exposed=(Window,Worker,AudioWorklet), Transferable] interface MessagePort : EventTarget { - undefined postMessage(any message, sequence transfer); + // FIXME: IDL Overload resolution fails here + // FIXME: undefined postMessage(any message, sequence transfer); undefined postMessage(any message, optional StructuredSerializeOptions options = {}); undefined start(); undefined close(); diff --git a/Userland/Libraries/LibWeb/HTML/StructuredSerialize.cpp b/Userland/Libraries/LibWeb/HTML/StructuredSerialize.cpp index 6f5524f5b9..bda0e4c3c0 100644 --- a/Userland/Libraries/LibWeb/HTML/StructuredSerialize.cpp +++ b/Userland/Libraries/LibWeb/HTML/StructuredSerialize.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -1129,3 +1131,39 @@ WebIDL::ExceptionOr structured_deserialize(JS::VM& vm, SerializationR } } + +namespace IPC { + +template<> +ErrorOr encode(Encoder& encoder, ::Web::HTML::TransferDataHolder const& data_holder) +{ + TRY(encoder.encode(data_holder.data)); + TRY(encoder.encode(data_holder.fds)); + return {}; +} + +template<> +ErrorOr encode(Encoder& encoder, ::Web::HTML::SerializedTransferRecord const& record) +{ + TRY(encoder.encode(record.serialized)); + TRY(encoder.encode(record.transfer_data_holders)); + return {}; +} + +template<> +ErrorOr<::Web::HTML::TransferDataHolder> decode(Decoder& decoder) +{ + auto data = TRY(decoder.decode>()); + auto fds = TRY(decoder.decode>()); + return ::Web::HTML::TransferDataHolder { move(data), move(fds) }; +} + +template<> +ErrorOr<::Web::HTML::SerializedTransferRecord> decode(Decoder& decoder) +{ + auto serialized = TRY(decoder.decode>()); + auto transfer_data_holders = TRY(decoder.decode>()); + return ::Web::HTML::SerializedTransferRecord { move(serialized), move(transfer_data_holders) }; +} + +} diff --git a/Userland/Libraries/LibWeb/HTML/StructuredSerialize.h b/Userland/Libraries/LibWeb/HTML/StructuredSerialize.h index 97d987ffc8..6a2fe2ce11 100644 --- a/Userland/Libraries/LibWeb/HTML/StructuredSerialize.h +++ b/Userland/Libraries/LibWeb/HTML/StructuredSerialize.h @@ -54,3 +54,19 @@ WebIDL::ExceptionOr structured_serialize_with_transfer WebIDL::ExceptionOr structured_deserialize_with_transfer(JS::VM& vm, SerializedTransferRecord&); } + +namespace IPC { + +template<> +ErrorOr encode(Encoder&, ::Web::HTML::SerializedTransferRecord const&); + +template<> +ErrorOr encode(Encoder&, ::Web::HTML::TransferDataHolder const&); + +template<> +ErrorOr<::Web::HTML::SerializedTransferRecord> decode(Decoder&); + +template<> +ErrorOr<::Web::HTML::TransferDataHolder> decode(Decoder&); + +}