From 94883866f5b7ca91b113ddac1c05afad3dd4a3e0 Mon Sep 17 00:00:00 2001 From: Shannon Booth Date: Tue, 13 Jun 2023 07:26:59 +1200 Subject: [PATCH] LibWeb: Implement Web::Streams::readable_stream_enqueue AO This AO will be used in the Web::FileAPI::Blob::get_stream() implementation to enqueue all data in the blob to the stream. There are still plenty of cases to handle, but this appears to be enough for the basic case of reading all chunks from a readable stream until it is done. --- .../LibWeb/Streams/AbstractOperations.cpp | 257 ++++++++++++++++++ .../LibWeb/Streams/AbstractOperations.h | 10 + 2 files changed, 267 insertions(+) diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp index bce3844dde..e1a2957c07 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -6,6 +6,7 @@ * SPDX-License-Identifier: BSD-2-Clause */ +#include #include #include #include @@ -1261,6 +1262,262 @@ WebIDL::ExceptionOr set_up_readable_byte_stream_controller(ReadableStream& return {}; } +// https://streams.spec.whatwg.org/#readablestream-enqueue +WebIDL::ExceptionOr readable_stream_enqueue(ReadableStreamController& controller, JS::Value chunk) +{ + // 1. If stream.[[controller]] implements ReadableStreamDefaultController, + if (controller.has>()) { + // 1. Perform ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk). + return readable_stream_default_controller_enqueue(controller.get>(), chunk); + } + // 2. Otherwise, + else { + // 1. Assert: stream.[[controller]] implements ReadableByteStreamController. + VERIFY(controller.has>()); + auto readable_byte_controller = controller.get>(); + + // FIXME: 2. Assert: chunk is an ArrayBufferView. + + // 3. Let byobView be the current BYOB request view for stream. + auto byob_view = readable_byte_controller->byob_request(); + + // 4. If byobView is non-null, and chunk.[[ViewedArrayBuffer]] is byobView.[[ViewedArrayBuffer]], then: + if (byob_view) { + // FIXME: 1. Assert: chunk.[[ByteOffset]] is byobView.[[ByteOffset]]. + // FIXME: 2. Assert: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]]. + // FIXME: 3. Perform ? ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]]). + TODO(); + } + + // 5. Otherwise, perform ? ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk). + return readable_byte_stream_controller_enqueue(readable_byte_controller, chunk); + } +} + +// https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue(ReadableByteStreamController& controller, JS::Value chunk) +{ + auto& vm = controller.vm(); + auto& realm = controller.realm(); + + // 1. Let stream be controller.[[stream]]. + auto stream = controller.stream(); + + // 2. If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return. + if (controller.close_requested() || stream->state() != ReadableStream ::State::Readable) + return {}; + + // 3. Let buffer be chunk.[[ViewedArrayBuffer]]. + auto* typed_array = TRY(JS::typed_array_from(vm, chunk)); + auto* buffer = typed_array->viewed_array_buffer(); + + // 4. Let byteOffset be chunk.[[ByteOffset]]. + auto byte_offset = typed_array->byte_offset(); + + // 5. Let byteLength be chunk.[[ByteLength]]. + auto byte_length = typed_array->byte_length(); + + // 6. If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception. + if (buffer->is_detached()) { + auto error = MUST_OR_THROW_OOM(JS::TypeError::create(realm, "Buffer is detached"sv)); + return JS::throw_completion(error); + } + + // 7. Let transferredBuffer be ? TransferArrayBuffer(buffer). + auto transferred_buffer = TRY(transfer_array_buffer(realm, *buffer)); + + // 8. If controller.[[pendingPullIntos]] is not empty, + if (!controller.pending_pull_intos().is_empty()) { + // 1. Let firstPendingPullInto be controller.[[pendingPullIntos]][0]. + auto& first_pending_pull_into = controller.pending_pull_intos().first(); + + // 2. If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception. + if (first_pending_pull_into.buffer->is_detached()) { + auto error = MUST_OR_THROW_OOM(JS::TypeError::create(realm, "Buffer is detached"sv)); + return JS::throw_completion(error); + } + + // 3. Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller). + readable_byte_stream_controller_invalidate_byob_request(controller); + + // 4. Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer). + first_pending_pull_into.buffer = TRY(transfer_array_buffer(realm, first_pending_pull_into.buffer)); + + // 5. If firstPendingPullInto’s reader type is "none", perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto). + if (first_pending_pull_into.reader_type == ReaderType::None) + TRY(readable_byte_stream_controller_enqueue_detached_pull_into_queue(controller, first_pending_pull_into)); + } + + // 9. If ! ReadableStreamHasDefaultReader(stream) is true, + if (readable_stream_has_default_reader(*stream)) { + // 1. Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller). + TRY(readable_byte_stream_controller_process_read_requests_using_queue(controller)); + + // 2. If ! ReadableStreamGetNumReadRequests(stream) is 0, + if (readable_stream_get_num_read_requests(*stream) == 0) { + // 1. Assert: controller.[[pendingPullIntos]] is empty. + VERIFY(controller.pending_pull_intos().is_empty()); + + // 2. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength). + readable_byte_stream_controller_enqueue_chunk_to_queue(controller, transferred_buffer, byte_offset, byte_length); + } + // 3. Otherwise. + else { + // 1. Assert: controller.[[queue]] is empty. + VERIFY(controller.queue().is_empty()); + + // 2. If controller.[[pendingPullIntos]] is not empty, + if (!controller.pending_pull_intos().is_empty()) { + // 1. Assert: controller.[[pendingPullIntos]][0]'s reader type is "default". + VERIFY(controller.pending_pull_intos().first().reader_type == ReaderType::Default); + + // 2. Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller). + readable_byte_stream_controller_shift_pending_pull_into(controller); + } + + // 3. Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »). + auto transferred_view = MUST_OR_THROW_OOM(JS::construct(vm, *realm.intrinsics().uint8_array_constructor(), transferred_buffer, JS::Value(byte_offset), JS::Value(byte_length))); + + // 4. Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false). + readable_stream_fulfill_read_request(*stream, transferred_view, false); + } + } + // 10. Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true, + else if (readable_stream_has_byob_reader(*stream)) { + // FIXME: 1. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength). + // FIXME: 2. Perform ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller). + TODO(); + } + // 11. Otherwise, + else { + // 1. Assert: ! IsReadableStreamLocked(stream) is false. + VERIFY(!is_readable_stream_locked(*stream)); + + // 2. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength). + readable_byte_stream_controller_enqueue_chunk_to_queue(controller, transferred_buffer, byte_offset, byte_length); + } + + // 12. Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller). + TRY(readable_byte_stream_controller_call_pull_if_needed(controller)); + + return {}; +} + +// https://streams.spec.whatwg.org/#transfer-array-buffer +WebIDL::ExceptionOr> transfer_array_buffer(JS::Realm& realm, JS::ArrayBuffer& buffer) +{ + auto& vm = realm.vm(); + + // 1. Assert: ! IsDetachedBuffer(O) is false. + VERIFY(!buffer.is_detached()); + + // 2. Let arrayBufferData be O.[[ArrayBufferData]]. + // 3. Let arrayBufferByteLength be O.[[ArrayBufferByteLength]]. + auto array_buffer = buffer.buffer(); + + // 4. Perform ? DetachArrayBuffer(O). + TRY(JS::detach_array_buffer(vm, buffer)); + + // 5. Return a new ArrayBuffer object, created in the current Realm, whose [[ArrayBufferData]] internal slot value is arrayBufferData and whose [[ArrayBufferByteLength]] internal slot value is arrayBufferByteLength. + return JS::ArrayBuffer::create(realm, array_buffer); +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_detached_pull_into_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor) +{ + // 1. Assert: pullIntoDescriptor’s reader type is "none". + VERIFY(pull_into_descriptor.reader_type == ReaderType::None); + + // 2. If pullIntoDescriptor’s bytes filled > 0, perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled). + if (pull_into_descriptor.bytes_filled > 0) + TRY(readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(controller, pull_into_descriptor.buffer, pull_into_descriptor.byte_offset, pull_into_descriptor.bytes_filled)); + + // 3. Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller). + readable_byte_stream_controller_shift_pending_pull_into(controller); + return {}; +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue +WebIDL::ExceptionOr readable_byte_stream_controller_process_read_requests_using_queue(ReadableByteStreamController& controller) +{ + // 1. Let reader be controller.[[stream]].[[reader]]. + auto reader = controller.stream()->reader(); + + // 2. Assert: reader implements ReadableStreamDefaultReader. + VERIFY(reader->has>()); + + // 3. While reader.[[readRequests]] is not empty, + auto readable_stream_default_reader = reader->get>(); + while (!readable_stream_default_reader->read_requests().is_empty()) { + // 1. If controller.[[queueTotalSize]] is 0, return. + if (controller.queue_total_size() == 0.0) + return {}; + + // 2. Let readRequest be reader.[[readRequests]][0]. + // 3. Remove readRequest from reader.[[readRequests]]. + auto read_request = readable_stream_default_reader->read_requests().take_first(); + + // 4. Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest). + TRY(readable_byte_stream_controller_fill_read_request_from_queue(controller, read_request)); + } + + return {}; +} + +// https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue +void readable_byte_stream_controller_enqueue_chunk_to_queue(ReadableByteStreamController& controller, JS::NonnullGCPtr buffer, u32 byte_offset, u32 byte_length) +{ + // 1. Append a new readable byte stream queue entry with buffer buffer, byte offset byteOffset, and byte length byteLength to controller.[[queue]]. + controller.queue().append(ReadableByteStreamQueueEntry { + .buffer = buffer, + .byte_offset = byte_offset, + .byte_length = byte_length, + }); + + // 2. Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength. + controller.set_queue_total_size(controller.queue_total_size() + byte_length); +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(ReadableByteStreamController& controller, JS::ArrayBuffer& buffer, u64 byte_offset, u64 byte_length) +{ + auto& vm = controller.vm(); + + // 1. Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%). + auto clone_result = JS::clone_array_buffer(vm, buffer, byte_offset, byte_length); + + // 2. If cloneResult is an abrupt completion, + if (clone_result.is_throw_completion()) { + auto throw_completion = Bindings::throw_dom_exception_if_needed(vm, [&] { return clone_result; }).throw_completion(); + + // 1. Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]). + readable_byte_stream_controller_error(controller, throw_completion.value().value()); + + // 2. Return cloneResult. + // Note: We need to return the throw_completion object here, as enqueue needs to throw the same object that the controller is errored with + return throw_completion; + } + + // 3. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength). + readable_byte_stream_controller_enqueue_chunk_to_queue(controller, *clone_result.release_value(), 0, byte_length); + + return {}; +} + +// https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into +PullIntoDescriptor readable_byte_stream_controller_shift_pending_pull_into(ReadableByteStreamController& controller) +{ + // 1. Assert: controller.[[byobRequest]] is null. + VERIFY(!controller.byob_request()); + + // 2. Let descriptor be controller.[[pendingPullIntos]][0]. + // 3. Remove descriptor from controller.[[pendingPullIntos]]. + auto descriptor = controller.pending_pull_intos().take_first(); + + // 4. Return descriptor. + return descriptor; +} + // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support WebIDL::ExceptionOr set_up_readable_stream_controller_with_byte_reading_support(ReadableStream& stream, Optional&& pull_algorithm, Optional&& cancel_algorithm, double high_water_mark) { diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h index 0fee0134cf..6ff90c6e5c 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h @@ -1,6 +1,7 @@ /* * Copyright (c) 2022, Linus Groh * Copyright (c) 2023, Matthew Olsson + * Copyright (c) 2023, Shannon Booth * * SPDX-License-Identifier: BSD-2-Clause */ @@ -59,6 +60,15 @@ WebIDL::ExceptionOr set_up_readable_stream_default_controller_from_underly WebIDL::ExceptionOr set_up_readable_stream_controller_with_byte_reading_support(ReadableStream&, Optional&& = {}, Optional&& = {}, double high_water_mark = 0); WebIDL::ExceptionOr set_up_readable_byte_stream_controller(ReadableStream&, ReadableByteStreamController&, StartAlgorithm&&, PullAlgorithm&&, CancelAlgorithm&&, double high_water_mark, JS::Value auto_allocate_chunk_size); +WebIDL::ExceptionOr readable_stream_enqueue(ReadableStreamController& controller, JS::Value chunk); +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue(ReadableByteStreamController& controller, JS::Value chunk); +WebIDL::ExceptionOr> transfer_array_buffer(JS::Realm& realm, JS::ArrayBuffer& buffer); +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_detached_pull_into_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor); +WebIDL::ExceptionOr readable_byte_stream_controller_process_read_requests_using_queue(ReadableByteStreamController& controller); +void readable_byte_stream_controller_enqueue_chunk_to_queue(ReadableByteStreamController& controller, JS::NonnullGCPtr buffer, u32 byte_offset, u32 byte_length); +WebIDL::ExceptionOr readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(ReadableByteStreamController& controller, JS::ArrayBuffer& buffer, u64 byte_offset, u64 byte_length); +PullIntoDescriptor readable_byte_stream_controller_shift_pending_pull_into(ReadableByteStreamController& controller); + WebIDL::ExceptionOr readable_byte_stream_controller_call_pull_if_needed(ReadableByteStreamController&); void readable_byte_stream_controller_clear_algorithms(ReadableByteStreamController&); void readable_byte_stream_controller_clear_pending_pull_intos(ReadableByteStreamController&);