diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp index bce3844dde..e1a2957c07 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -6,6 +6,7 @@ * SPDX-License-Identifier: BSD-2-Clause */ +#include #include #include #include @@ -1261,6 +1262,262 @@ WebIDL::ExceptionOr set_up_readable_byte_stream_controller(ReadableStream& return {}; } +// https://streams.spec.whatwg.org/#readablestream-enqueue +WebIDL::ExceptionOr readable_stream_enqueue(ReadableStreamController& controller, JS::Value chunk) +{ + // 1. If stream.[[controller]] implements ReadableStreamDefaultController, + if (controller.has>()) { + // 1. Perform ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk). + return readable_stream_default_controller_enqueue(controller.get>(), chunk); + } + // 2. Otherwise, + else { + // 1. Assert: stream.[[controller]] implements ReadableByteStreamController. + VERIFY(controller.has>()); + auto readable_byte_controller = controller.get>(); + + // FIXME: 2. Assert: chunk is an ArrayBufferView. + + // 3. Let byobView be the current BYOB request view for stream. + auto byob_view = readable_byte_controller->byob_request(); + + // 4. If byobView is non-null, and chunk.[[ViewedArrayBuffer]] is byobView.[[ViewedArrayBuffer]], then: + if (byob_view) { + // FIXME: 1. Assert: chunk.[[ByteOffset]] is byobView.[[ByteOffset]]. + // FIXME: 2. Assert: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]]. + // FIXME: 3. Perform ? ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]]). + TODO(); + } + + // 5. Otherwise, perform ? ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk). + return readable_byte_stream_controller_enqueue(readable_byte_controller, chunk); + } +} + +// https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue(ReadableByteStreamController& controller, JS::Value chunk) +{ + auto& vm = controller.vm(); + auto& realm = controller.realm(); + + // 1. Let stream be controller.[[stream]]. + auto stream = controller.stream(); + + // 2. If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return. + if (controller.close_requested() || stream->state() != ReadableStream ::State::Readable) + return {}; + + // 3. Let buffer be chunk.[[ViewedArrayBuffer]]. + auto* typed_array = TRY(JS::typed_array_from(vm, chunk)); + auto* buffer = typed_array->viewed_array_buffer(); + + // 4. Let byteOffset be chunk.[[ByteOffset]]. + auto byte_offset = typed_array->byte_offset(); + + // 5. Let byteLength be chunk.[[ByteLength]]. + auto byte_length = typed_array->byte_length(); + + // 6. If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception. + if (buffer->is_detached()) { + auto error = MUST_OR_THROW_OOM(JS::TypeError::create(realm, "Buffer is detached"sv)); + return JS::throw_completion(error); + } + + // 7. Let transferredBuffer be ? TransferArrayBuffer(buffer). + auto transferred_buffer = TRY(transfer_array_buffer(realm, *buffer)); + + // 8. If controller.[[pendingPullIntos]] is not empty, + if (!controller.pending_pull_intos().is_empty()) { + // 1. Let firstPendingPullInto be controller.[[pendingPullIntos]][0]. + auto& first_pending_pull_into = controller.pending_pull_intos().first(); + + // 2. If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception. + if (first_pending_pull_into.buffer->is_detached()) { + auto error = MUST_OR_THROW_OOM(JS::TypeError::create(realm, "Buffer is detached"sv)); + return JS::throw_completion(error); + } + + // 3. Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller). + readable_byte_stream_controller_invalidate_byob_request(controller); + + // 4. Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer). + first_pending_pull_into.buffer = TRY(transfer_array_buffer(realm, first_pending_pull_into.buffer)); + + // 5. If firstPendingPullInto’s reader type is "none", perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto). + if (first_pending_pull_into.reader_type == ReaderType::None) + TRY(readable_byte_stream_controller_enqueue_detached_pull_into_queue(controller, first_pending_pull_into)); + } + + // 9. If ! ReadableStreamHasDefaultReader(stream) is true, + if (readable_stream_has_default_reader(*stream)) { + // 1. Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller). + TRY(readable_byte_stream_controller_process_read_requests_using_queue(controller)); + + // 2. If ! ReadableStreamGetNumReadRequests(stream) is 0, + if (readable_stream_get_num_read_requests(*stream) == 0) { + // 1. Assert: controller.[[pendingPullIntos]] is empty. + VERIFY(controller.pending_pull_intos().is_empty()); + + // 2. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength). + readable_byte_stream_controller_enqueue_chunk_to_queue(controller, transferred_buffer, byte_offset, byte_length); + } + // 3. Otherwise. + else { + // 1. Assert: controller.[[queue]] is empty. + VERIFY(controller.queue().is_empty()); + + // 2. If controller.[[pendingPullIntos]] is not empty, + if (!controller.pending_pull_intos().is_empty()) { + // 1. Assert: controller.[[pendingPullIntos]][0]'s reader type is "default". + VERIFY(controller.pending_pull_intos().first().reader_type == ReaderType::Default); + + // 2. Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller). + readable_byte_stream_controller_shift_pending_pull_into(controller); + } + + // 3. Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »). + auto transferred_view = MUST_OR_THROW_OOM(JS::construct(vm, *realm.intrinsics().uint8_array_constructor(), transferred_buffer, JS::Value(byte_offset), JS::Value(byte_length))); + + // 4. Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false). + readable_stream_fulfill_read_request(*stream, transferred_view, false); + } + } + // 10. Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true, + else if (readable_stream_has_byob_reader(*stream)) { + // FIXME: 1. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength). + // FIXME: 2. Perform ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller). + TODO(); + } + // 11. Otherwise, + else { + // 1. Assert: ! IsReadableStreamLocked(stream) is false. + VERIFY(!is_readable_stream_locked(*stream)); + + // 2. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength). + readable_byte_stream_controller_enqueue_chunk_to_queue(controller, transferred_buffer, byte_offset, byte_length); + } + + // 12. Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller). + TRY(readable_byte_stream_controller_call_pull_if_needed(controller)); + + return {}; +} + +// https://streams.spec.whatwg.org/#transfer-array-buffer +WebIDL::ExceptionOr> transfer_array_buffer(JS::Realm& realm, JS::ArrayBuffer& buffer) +{ + auto& vm = realm.vm(); + + // 1. Assert: ! IsDetachedBuffer(O) is false. + VERIFY(!buffer.is_detached()); + + // 2. Let arrayBufferData be O.[[ArrayBufferData]]. + // 3. Let arrayBufferByteLength be O.[[ArrayBufferByteLength]]. + auto array_buffer = buffer.buffer(); + + // 4. Perform ? DetachArrayBuffer(O). + TRY(JS::detach_array_buffer(vm, buffer)); + + // 5. Return a new ArrayBuffer object, created in the current Realm, whose [[ArrayBufferData]] internal slot value is arrayBufferData and whose [[ArrayBufferByteLength]] internal slot value is arrayBufferByteLength. + return JS::ArrayBuffer::create(realm, array_buffer); +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_detached_pull_into_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor) +{ + // 1. Assert: pullIntoDescriptor’s reader type is "none". + VERIFY(pull_into_descriptor.reader_type == ReaderType::None); + + // 2. If pullIntoDescriptor’s bytes filled > 0, perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled). + if (pull_into_descriptor.bytes_filled > 0) + TRY(readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(controller, pull_into_descriptor.buffer, pull_into_descriptor.byte_offset, pull_into_descriptor.bytes_filled)); + + // 3. Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller). + readable_byte_stream_controller_shift_pending_pull_into(controller); + return {}; +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue +WebIDL::ExceptionOr readable_byte_stream_controller_process_read_requests_using_queue(ReadableByteStreamController& controller) +{ + // 1. Let reader be controller.[[stream]].[[reader]]. + auto reader = controller.stream()->reader(); + + // 2. Assert: reader implements ReadableStreamDefaultReader. + VERIFY(reader->has>()); + + // 3. While reader.[[readRequests]] is not empty, + auto readable_stream_default_reader = reader->get>(); + while (!readable_stream_default_reader->read_requests().is_empty()) { + // 1. If controller.[[queueTotalSize]] is 0, return. + if (controller.queue_total_size() == 0.0) + return {}; + + // 2. Let readRequest be reader.[[readRequests]][0]. + // 3. Remove readRequest from reader.[[readRequests]]. + auto read_request = readable_stream_default_reader->read_requests().take_first(); + + // 4. Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest). + TRY(readable_byte_stream_controller_fill_read_request_from_queue(controller, read_request)); + } + + return {}; +} + +// https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue +void readable_byte_stream_controller_enqueue_chunk_to_queue(ReadableByteStreamController& controller, JS::NonnullGCPtr buffer, u32 byte_offset, u32 byte_length) +{ + // 1. Append a new readable byte stream queue entry with buffer buffer, byte offset byteOffset, and byte length byteLength to controller.[[queue]]. + controller.queue().append(ReadableByteStreamQueueEntry { + .buffer = buffer, + .byte_offset = byte_offset, + .byte_length = byte_length, + }); + + // 2. Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength. + controller.set_queue_total_size(controller.queue_total_size() + byte_length); +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(ReadableByteStreamController& controller, JS::ArrayBuffer& buffer, u64 byte_offset, u64 byte_length) +{ + auto& vm = controller.vm(); + + // 1. Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%). + auto clone_result = JS::clone_array_buffer(vm, buffer, byte_offset, byte_length); + + // 2. If cloneResult is an abrupt completion, + if (clone_result.is_throw_completion()) { + auto throw_completion = Bindings::throw_dom_exception_if_needed(vm, [&] { return clone_result; }).throw_completion(); + + // 1. Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]). + readable_byte_stream_controller_error(controller, throw_completion.value().value()); + + // 2. Return cloneResult. + // Note: We need to return the throw_completion object here, as enqueue needs to throw the same object that the controller is errored with + return throw_completion; + } + + // 3. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength). + readable_byte_stream_controller_enqueue_chunk_to_queue(controller, *clone_result.release_value(), 0, byte_length); + + return {}; +} + +// https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into +PullIntoDescriptor readable_byte_stream_controller_shift_pending_pull_into(ReadableByteStreamController& controller) +{ + // 1. Assert: controller.[[byobRequest]] is null. + VERIFY(!controller.byob_request()); + + // 2. Let descriptor be controller.[[pendingPullIntos]][0]. + // 3. Remove descriptor from controller.[[pendingPullIntos]]. + auto descriptor = controller.pending_pull_intos().take_first(); + + // 4. Return descriptor. + return descriptor; +} + // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support WebIDL::ExceptionOr set_up_readable_stream_controller_with_byte_reading_support(ReadableStream& stream, Optional&& pull_algorithm, Optional&& cancel_algorithm, double high_water_mark) { diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h index 0fee0134cf..6ff90c6e5c 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h @@ -1,6 +1,7 @@ /* * Copyright (c) 2022, Linus Groh * Copyright (c) 2023, Matthew Olsson + * Copyright (c) 2023, Shannon Booth * * SPDX-License-Identifier: BSD-2-Clause */ @@ -59,6 +60,15 @@ WebIDL::ExceptionOr set_up_readable_stream_default_controller_from_underly WebIDL::ExceptionOr set_up_readable_stream_controller_with_byte_reading_support(ReadableStream&, Optional&& = {}, Optional&& = {}, double high_water_mark = 0); WebIDL::ExceptionOr set_up_readable_byte_stream_controller(ReadableStream&, ReadableByteStreamController&, StartAlgorithm&&, PullAlgorithm&&, CancelAlgorithm&&, double high_water_mark, JS::Value auto_allocate_chunk_size); +WebIDL::ExceptionOr readable_stream_enqueue(ReadableStreamController& controller, JS::Value chunk); +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue(ReadableByteStreamController& controller, JS::Value chunk); +WebIDL::ExceptionOr> transfer_array_buffer(JS::Realm& realm, JS::ArrayBuffer& buffer); +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_detached_pull_into_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor); +WebIDL::ExceptionOr readable_byte_stream_controller_process_read_requests_using_queue(ReadableByteStreamController& controller); +void readable_byte_stream_controller_enqueue_chunk_to_queue(ReadableByteStreamController& controller, JS::NonnullGCPtr buffer, u32 byte_offset, u32 byte_length); +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(ReadableByteStreamController& controller, JS::ArrayBuffer& buffer, u64 byte_offset, u64 byte_length); +PullIntoDescriptor readable_byte_stream_controller_shift_pending_pull_into(ReadableByteStreamController& controller); + WebIDL::ExceptionOr readable_byte_stream_controller_call_pull_if_needed(ReadableByteStreamController&); void readable_byte_stream_controller_clear_algorithms(ReadableByteStreamController&); void readable_byte_stream_controller_clear_pending_pull_intos(ReadableByteStreamController&);