diff --git a/Tests/LibWeb/Text/expected/Streams/ReadableByteStream-byob-tee.txt b/Tests/LibWeb/Text/expected/Streams/ReadableByteStream-byob-tee.txt new file mode 100644 index 0000000000..f33d47bd45 --- /dev/null +++ b/Tests/LibWeb/Text/expected/Streams/ReadableByteStream-byob-tee.txt @@ -0,0 +1,6 @@ +stream1: abcdefghijklmnopqrstuvwxyz +stream1: ABCDEFGHIJKLMNOPQRSTUVWXYZ +stream1: 0123456789!@#$%^&*()-=_+,< +stream1: Done! +stream2: abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()-=_+,< +stream2: Done! diff --git a/Tests/LibWeb/Text/expected/Streams/ReadableByteStream-default-tee.txt b/Tests/LibWeb/Text/expected/Streams/ReadableByteStream-default-tee.txt new file mode 100644 index 0000000000..daeb8e4bb1 --- /dev/null +++ b/Tests/LibWeb/Text/expected/Streams/ReadableByteStream-default-tee.txt @@ -0,0 +1,8 @@ +stream1: abcdefghijklmnopqrstuvwxyz +stream1: ABCDEFGHIJKLMNOPQRSTUVWXYZ +stream1: 0123456789!@#$%^&*()-=_+,< +stream1: Done! +stream2: abcdefghijklmnopqrstuvwxyz +stream2: ABCDEFGHIJKLMNOPQRSTUVWXYZ +stream2: 0123456789!@#$%^&*()-=_+,< +stream2: Done! diff --git a/Tests/LibWeb/Text/input/Streams/ReadableByteStream-byob-tee.html b/Tests/LibWeb/Text/input/Streams/ReadableByteStream-byob-tee.html new file mode 100644 index 0000000000..333f910e8a --- /dev/null +++ b/Tests/LibWeb/Text/input/Streams/ReadableByteStream-byob-tee.html @@ -0,0 +1,84 @@ + + diff --git a/Tests/LibWeb/Text/input/Streams/ReadableByteStream-default-tee.html b/Tests/LibWeb/Text/input/Streams/ReadableByteStream-default-tee.html new file mode 100644 index 0000000000..b425850b83 --- /dev/null +++ b/Tests/LibWeb/Text/input/Streams/ReadableByteStream-default-tee.html @@ -0,0 +1,64 @@ + + diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp index 025b1c216a..47331d6fb9 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -249,7 +249,7 @@ WebIDL::ExceptionOr readable_stream_tee(JS::Realm& realm, Re // 3. If stream.[[controller]] implements ReadableByteStreamController, return ? ReadableByteStreamTee(stream). if (stream.controller()->has>()) { - return realm.vm().throw_completion(JS::ErrorType::NotImplemented, "Byte stream teeing"); + return TRY(readable_byte_stream_tee(realm, stream)); } // 4. Return ? ReadableStreamDefaultTee(stream, cloneForBranch2). @@ -557,6 +557,627 @@ WebIDL::ExceptionOr readable_stream_default_tee(JS::Realm& r return ReadableStreamPair { *params->branch1, *params->branch2 }; } +struct ByteStreamTeeParams final : JS::Cell { + JS_CELL(TeeParams, JS::Cell); + JS_DECLARE_ALLOCATOR(ByteStreamTeeParams); + + explicit ByteStreamTeeParams(ReadableStreamReader reader) + : reader(move(reader)) + { + } + + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(reason1); + visitor.visit(reason2); + visitor.visit(branch1); + visitor.visit(branch2); + visitor.visit(pull1_algorithm); + visitor.visit(pull2_algorithm); + reader.visit([&](auto const& underlying_reader) { visitor.visit(underlying_reader); }); + } + + bool reading { false }; + bool read_again_for_branch1 { false }; + bool read_again_for_branch2 { false }; + bool canceled1 { false }; + bool canceled2 { false }; + JS::Value reason1 { JS::js_undefined() }; + JS::Value reason2 { JS::js_undefined() }; + JS::GCPtr branch1; + JS::GCPtr branch2; + JS::GCPtr pull1_algorithm; + JS::GCPtr pull2_algorithm; + ReadableStreamReader reader; +}; + +JS_DEFINE_ALLOCATOR(ByteStreamTeeParams); + +// https://streams.spec.whatwg.org/#ref-for-read-request④ +class ByteStreamTeeDefaultReadRequest final : public ReadRequest { + JS_CELL(ByteStreamTeeDefaultReadRequest, Cell); + JS_DECLARE_ALLOCATOR(ByteStreamTeeDefaultReadRequest); + +public: + ByteStreamTeeDefaultReadRequest( + JS::Realm& realm, + JS::NonnullGCPtr stream, + JS::NonnullGCPtr params, + JS::NonnullGCPtr cancel_promise) + : m_realm(realm) + , m_stream(stream) + , m_params(params) + , m_cancel_promise(cancel_promise) + { + } + + // https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps④ + virtual void on_chunk(JS::Value chunk) override + { + // 1. Queue a microtask to perform the following steps: + HTML::queue_a_microtask(nullptr, [this, chunk]() mutable { + HTML::TemporaryExecutionContext execution_context { Bindings::host_defined_environment_settings_object(m_realm) }; + + auto controller1 = m_params->branch1->controller()->get>(); + auto controller2 = m_params->branch2->controller()->get>(); + + // 1. Set readAgainForBranch1 to false. + m_params->read_again_for_branch1 = false; + + // 2. Set readAgainForBranch2 to false. + m_params->read_again_for_branch2 = false; + + // 3. Let chunk1 and chunk2 be chunk. + auto chunk1 = chunk; + auto chunk2 = chunk; + + // 4. If canceled1 is false and canceled2 is false, + if (!m_params->canceled1 && !m_params->canceled2) { + // 1. Let cloneResult be CloneAsUint8Array(chunk). + auto chunk_view = m_realm->vm().heap().allocate(m_realm, chunk.as_object()); + auto clone_result = clone_as_uint8_array(m_realm, chunk_view); + + // 2. If cloneResult is an abrupt completion, + if (clone_result.is_exception()) { + auto completion = Bindings::dom_exception_to_throw_completion(m_realm->vm(), clone_result.release_error()); + + // 1. Perform ! ReadableByteStreamControllerError(branch1.[[controller]], cloneResult.[[Value]]). + readable_byte_stream_controller_error(controller1, completion.value().value()); + + // 2. Perform ! ReadableByteStreamControllerError(branch2.[[controller]], cloneResult.[[Value]]). + readable_byte_stream_controller_error(controller2, completion.value().value()); + + // 3. Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]). + auto cancel_result = MUST(readable_stream_cancel(m_stream, completion.value().value())); + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + + WebIDL::resolve_promise(m_realm, m_cancel_promise, cancel_value); + + // 4. Return. + return; + } + + // 3. Otherwise, set chunk2 to cloneResult.[[Value]]. + chunk2 = clone_result.release_value(); + } + + // 5. If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1). + if (!m_params->canceled1) { + MUST(readable_byte_stream_controller_enqueue(controller1, chunk1)); + } + + // 6. If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2). + if (!m_params->canceled2) { + MUST(readable_byte_stream_controller_enqueue(controller2, chunk2)); + } + + // 7. Set reading to false. + m_params->reading = false; + + // 8. If readAgainForBranch1 is true, perform pull1Algorithm. + if (m_params->read_again_for_branch1) { + MUST(m_params->pull1_algorithm->function()()); + } + // 9. Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm. + else if (m_params->read_again_for_branch2) { + MUST(m_params->pull2_algorithm->function()()); + } + }); + + // NOTE: The microtask delay here is necessary because it takes at least a microtask to detect errors, when we + // use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we + // cannot let successful synchronously-available reads happen ahead of asynchronously-available errors. + } + + // https://streams.spec.whatwg.org/#ref-for-read-request-close-steps③ + virtual void on_close() override + { + auto controller1 = m_params->branch1->controller()->get>(); + auto controller2 = m_params->branch2->controller()->get>(); + + // 1. Set reading to false. + m_params->reading = false; + + // 2. If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]). + if (!m_params->canceled1) { + MUST(readable_byte_stream_controller_close(controller1)); + } + + // 3. If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]). + if (!m_params->canceled2) { + MUST(readable_byte_stream_controller_close(controller2)); + } + + // 4. If branch1.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0). + if (!controller1->pending_pull_intos().is_empty()) { + MUST(readable_byte_stream_controller_respond(controller1, 0)); + } + + // 5. If branch2.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0). + if (!controller2->pending_pull_intos().is_empty()) { + MUST(readable_byte_stream_controller_respond(controller2, 0)); + } + + // 6. If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined. + if (!m_params->canceled1 || !m_params->canceled2) { + WebIDL::resolve_promise(m_realm, m_cancel_promise, JS::js_undefined()); + } + } + + // https://streams.spec.whatwg.org/#ref-for-read-request-error-steps④ + virtual void on_error(JS::Value) override + { + // 1. Set reading to false. + m_params->reading = false; + } + +private: + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(m_realm); + visitor.visit(m_stream); + visitor.visit(m_params); + visitor.visit(m_cancel_promise); + } + + JS::NonnullGCPtr m_realm; + JS::NonnullGCPtr m_stream; + JS::NonnullGCPtr m_params; + JS::NonnullGCPtr m_cancel_promise; +}; + +JS_DEFINE_ALLOCATOR(ByteStreamTeeDefaultReadRequest); + +// https://streams.spec.whatwg.org/#ref-for-read-into-request② +class ByteStreamTeeBYOBReadRequest final : public ReadIntoRequest { + JS_CELL(ByteStreamTeeBYOBReadRequest, Cell); + JS_DECLARE_ALLOCATOR(ByteStreamTeeBYOBReadRequest); + +public: + ByteStreamTeeBYOBReadRequest( + JS::Realm& realm, + JS::NonnullGCPtr stream, + JS::NonnullGCPtr params, + JS::NonnullGCPtr cancel_promise, + JS::NonnullGCPtr byob_branch, + JS::NonnullGCPtr other_branch, + bool for_branch2) + : m_realm(realm) + , m_stream(stream) + , m_params(params) + , m_cancel_promise(cancel_promise) + , m_byob_branch(byob_branch) + , m_other_branch(other_branch) + , m_for_branch2(for_branch2) + { + } + + // https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps① + virtual void on_chunk(JS::Value chunk) override + { + auto chunk_view = m_realm->vm().heap().allocate(m_realm, chunk.as_object()); + + // 1. Queue a microtask to perform the following steps: + HTML::queue_a_microtask(nullptr, [this, chunk = chunk_view]() { + HTML::TemporaryExecutionContext execution_context { Bindings::host_defined_environment_settings_object(m_realm) }; + + auto byob_controller = m_byob_branch->controller()->get>(); + auto other_controller = m_other_branch->controller()->get>(); + + // 1. Set readAgainForBranch1 to false. + m_params->read_again_for_branch1 = false; + + // 2. Set readAgainForBranch2 to false. + m_params->read_again_for_branch2 = false; + + // 3. Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise. + auto byob_cancelled = m_for_branch2 ? m_params->canceled2 : m_params->canceled1; + + // 4. Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise. + auto other_cancelled = !m_for_branch2 ? m_params->canceled2 : m_params->canceled1; + + // 5. If otherCanceled is false, + if (!other_cancelled) { + // 1. Let cloneResult be CloneAsUint8Array(chunk). + auto clone_result = clone_as_uint8_array(m_realm, chunk); + + // 2. If cloneResult is an abrupt completion, + if (clone_result.is_exception()) { + auto completion = Bindings::dom_exception_to_throw_completion(m_realm->vm(), clone_result.release_error()); + + // 1. Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]). + readable_byte_stream_controller_error(byob_controller, completion.value().value()); + + // 2. Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]). + readable_byte_stream_controller_error(other_controller, completion.value().value()); + + // 3. Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]). + auto cancel_result = MUST(readable_stream_cancel(m_stream, completion.value().value())); + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + + WebIDL::resolve_promise(m_realm, m_cancel_promise, cancel_value); + + // 4. Return. + return; + } + + // 3. Otherwise, let clonedChunk be cloneResult.[[Value]]. + auto cloned_chunk = clone_result.release_value(); + + // 4. If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk). + if (!byob_cancelled) { + MUST(readable_byte_stream_controller_respond_with_new_view(m_realm, byob_controller, chunk)); + } + + // 5. Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk). + MUST(readable_byte_stream_controller_enqueue(other_controller, cloned_chunk)); + } + // 6. Otherwise, if byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk). + else if (!byob_cancelled) { + MUST(readable_byte_stream_controller_respond_with_new_view(m_realm, byob_controller, chunk)); + } + + // 7. Set reading to false. + m_params->reading = false; + + // 8. If readAgainForBranch1 is true, perform pull1Algorithm. + if (m_params->read_again_for_branch1) { + MUST(m_params->pull1_algorithm->function()()); + } + // 9. Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm. + else if (m_params->read_again_for_branch2) { + MUST(m_params->pull2_algorithm->function()()); + } + }); + + // NOTE: The microtask delay here is necessary because it takes at least a microtask to detect errors, when we + // use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we + // cannot let successful synchronously-available reads happen ahead of asynchronously-available errors. + } + + // https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps② + virtual void on_close(JS::Value chunk) override + { + auto byob_controller = m_byob_branch->controller()->get>(); + auto other_controller = m_other_branch->controller()->get>(); + + // 1. Set reading to false. + m_params->reading = false; + + // 2. Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise. + auto byob_cancelled = m_for_branch2 ? m_params->canceled2 : m_params->canceled1; + + // 3. Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise. + auto other_cancelled = !m_for_branch2 ? m_params->canceled2 : m_params->canceled1; + + // 4. If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]). + if (!byob_cancelled) { + MUST(readable_byte_stream_controller_close(byob_controller)); + } + + // 5. If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]). + if (!other_cancelled) { + MUST(readable_byte_stream_controller_close(other_controller)); + } + + // 6. If chunk is not undefined, + if (!chunk.is_undefined()) { + // 1. Assert: chunk.[[ByteLength]] is 0. + + // 2. If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk). + if (!byob_cancelled) { + auto array_buffer_view = m_realm->vm().heap().allocate(m_realm, chunk.as_object()); + MUST(readable_byte_stream_controller_respond_with_new_view(m_realm, byob_controller, array_buffer_view)); + } + + // 3. If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty, + // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0). + if (!other_cancelled && !other_controller->pending_pull_intos().is_empty()) { + MUST(readable_byte_stream_controller_respond(other_controller, 0)); + } + } + + // 7. If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined. + if (!byob_cancelled || !other_cancelled) { + WebIDL::resolve_promise(m_realm, m_cancel_promise, JS::js_undefined()); + } + } + + // https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps① + virtual void on_error(JS::Value) override + { + // 1. Set reading to false. + m_params->reading = false; + } + +private: + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(m_realm); + visitor.visit(m_stream); + visitor.visit(m_params); + visitor.visit(m_cancel_promise); + visitor.visit(m_byob_branch); + visitor.visit(m_other_branch); + } + + JS::NonnullGCPtr m_realm; + JS::NonnullGCPtr m_stream; + JS::NonnullGCPtr m_params; + JS::NonnullGCPtr m_cancel_promise; + JS::NonnullGCPtr m_byob_branch; + JS::NonnullGCPtr m_other_branch; + bool m_for_branch2 { false }; +}; + +JS_DEFINE_ALLOCATOR(ByteStreamTeeBYOBReadRequest); + +// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee +WebIDL::ExceptionOr readable_byte_stream_tee(JS::Realm& realm, ReadableStream& stream) +{ + // 1. Assert: stream implements ReadableStream. + // 2. Assert: stream.[[controller]] implements ReadableByteStreamController. + VERIFY(stream.controller().has_value() && stream.controller()->has>()); + + // 3. Let reader be ? AcquireReadableStreamDefaultReader(stream). + auto reader = TRY(acquire_readable_stream_default_reader(stream)); + + // 4. Let reading be false. + // 5. Let readAgainForBranch1 be false. + // 6. Let readAgainForBranch2 be false. + // 7. Let canceled1 be false. + // 8. Let canceled2 be false. + // 9. Let reason1 be undefined. + // 10. Let reason2 be undefined. + // 11. Let branch1 be undefined. + // 12. Let branch2 be undefined. + auto params = realm.heap().allocate(realm, reader); + + // 13. Let cancelPromise be a new promise. + auto cancel_promise = WebIDL::create_promise(realm); + + // 14. Let forwardReaderError be the following steps, taking a thisReader argument: + auto forward_reader_error = JS::create_heap_function(realm.heap(), [&realm, params, cancel_promise](ReadableStreamReader const& this_reader) { + // 1. Upon rejection of thisReader.[[closedPromise]] with reason r, + auto closed_promise = this_reader.visit([](auto const& underlying_reader) { return underlying_reader->closed_promise_capability(); }); + + WebIDL::upon_rejection(*closed_promise, [&realm, this_reader, params, cancel_promise](auto reason) -> WebIDL::ExceptionOr { + auto controller1 = params->branch1->controller()->get>(); + auto controller2 = params->branch2->controller()->get>(); + + // 1. If thisReader is not reader, return. + if (this_reader != params->reader) { + return JS::js_undefined(); + } + + // 2. Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r). + readable_byte_stream_controller_error(controller1, reason); + + // 3. Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r). + readable_byte_stream_controller_error(controller2, reason); + + // 4. If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined. + if (!params->canceled1 || !params->canceled2) { + WebIDL::resolve_promise(realm, cancel_promise, JS::js_undefined()); + } + + return JS::js_undefined(); + }); + }); + + // 15. Let pullWithDefaultReader be the following steps: + auto pull_with_default_reader = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise, forward_reader_error]() mutable { + // 1. If reader implements ReadableStreamBYOBReader, + if (auto const* byob_reader = params->reader.get_pointer>()) { + // 1. Assert: reader.[[readIntoRequests]] is empty. + VERIFY((*byob_reader)->read_into_requests().is_empty()); + + // 2. Perform ! ReadableStreamBYOBReaderRelease(reader). + readable_stream_byob_reader_release(*byob_reader); + + // 3. Set reader to ! AcquireReadableStreamDefaultReader(stream). + params->reader = MUST(acquire_readable_stream_default_reader(stream)); + + // 4. Perform forwardReaderError, given reader. + forward_reader_error->function()(params->reader); + } + + // 2. Let readRequest be a read request with the following items: + auto read_request = realm.heap().allocate_without_realm(realm, stream, params, cancel_promise); + + // 3. Perform ! ReadableStreamDefaultReaderRead(reader, readRequest). + MUST(readable_stream_default_reader_read(params->reader.get>(), read_request)); + }); + + // 16. Let pullWithBYOBReader be the following steps, given view and forBranch2: + auto pull_with_byob_reader = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise, forward_reader_error](JS::NonnullGCPtr view, bool for_branch2) mutable { + // 1. If reader implements ReadableStreamDefaultReader, + if (auto const* default_reader = params->reader.get_pointer>()) { + // 2. Assert: reader.[[readRequests]] is empty. + VERIFY((*default_reader)->read_requests().is_empty()); + + // 3. Perform ! ReadableStreamDefaultReaderRelease(reader). + MUST(readable_stream_default_reader_release(*default_reader)); + + // 4. Set reader to ! AcquireReadableStreamBYOBReader(stream). + params->reader = MUST(acquire_readable_stream_byob_reader(stream)); + + // 5. Perform forwardReaderError, given reader. + forward_reader_error->function()(params->reader); + }; + + // 2. Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise. + auto byob_branch = for_branch2 ? params->branch2 : params->branch1; + + // 3. Let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise. + auto other_branch = !for_branch2 ? params->branch2 : params->branch1; + + // 4. Let readIntoRequest be a read-into request with the following items: + auto read_into_request = realm.heap().allocate_without_realm(realm, stream, params, cancel_promise, *byob_branch, *other_branch, for_branch2); + + // 5. Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest). + readable_stream_byob_reader_read(params->reader.get>(), view, read_into_request); + }); + + // 17. Let pull1Algorithm be the following steps: + auto pull1_algorithm = JS::create_heap_function(realm.heap(), [&realm, params, pull_with_default_reader, pull_with_byob_reader]() -> WebIDL::ExceptionOr> { + auto controller1 = params->branch1->controller()->get>(); + + // 1. If reading is true, + if (params->reading) { + // 1. Set readAgainForBranch1 to true. + params->read_again_for_branch1 = true; + + // 2. Return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + } + + // 2. Set reading to true. + params->reading = true; + + // 3. Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]). + auto byob_request = readable_byte_stream_controller_get_byob_request(controller1); + + // 4. If byobRequest is null, perform pullWithDefaultReader. + if (!byob_request) { + pull_with_default_reader->function()(); + } + // 5. Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false. + else { + pull_with_byob_reader->function()(*byob_request->view(), false); + } + + // 6. Return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + }); + + // 18. Let pull2Algorithm be the following steps: + auto pull2_algorithm = JS::create_heap_function(realm.heap(), [&realm, params, pull_with_default_reader, pull_with_byob_reader]() -> WebIDL::ExceptionOr> { + auto controller2 = params->branch2->controller()->get>(); + + // 1. If reading is true, + if (params->reading) { + // 1. Set readAgainForBranch2 to true. + params->read_again_for_branch2 = true; + + // 2. Return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + } + + // 2. Set reading to true. + params->reading = true; + + // 3. Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]). + auto byob_request = readable_byte_stream_controller_get_byob_request(controller2); + + // 4. If byobRequest is null, perform pullWithDefaultReader. + if (!byob_request) { + pull_with_default_reader->function()(); + } + // 5. Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true. + else { + pull_with_byob_reader->function()(*byob_request->view(), true); + } + + // 6. Return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + }); + + // AD-HOC: The read requests within the pull algorithms must be able to re-invoke the pull algorithms, so cache them here. + params->pull1_algorithm = pull1_algorithm; + params->pull2_algorithm = pull2_algorithm; + + // 19. Let cancel1Algorithm be the following steps, taking a reason argument: + auto cancel1_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise](JS::Value reason) -> WebIDL::ExceptionOr> { + // 1. Set canceled1 to true. + params->canceled1 = true; + + // 2. Set reason1 to reason. + params->reason1 = reason; + + // 3. If canceled2 is true, + if (params->canceled2) { + // 1. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »). + auto composite_reason = JS::Array::create_from(realm, AK::Array { params->reason1, params->reason2 }); + + // 2. Let cancelResult be ! ReadableStreamCancel(stream, compositeReason). + auto cancel_result = MUST(readable_stream_cancel(stream, composite_reason)); + + // 3. Resolve cancelPromise with cancelResult. + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + WebIDL::resolve_promise(realm, cancel_promise, cancel_value); + } + + // 4. Return cancelPromise. + return cancel_promise; + }); + + // 20. Let cancel2Algorithm be the following steps, taking a reason argument: + auto cancel2_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise](JS::Value reason) -> WebIDL::ExceptionOr> { + // 1. Set canceled2 to true. + params->canceled2 = true; + + // 2. Set reason2 to reason. + params->reason2 = reason; + + // 3. If canceled1 is true, + if (params->canceled1) { + // 1. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »). + auto composite_reason = JS::Array::create_from(realm, AK::Array { params->reason1, params->reason2 }); + + // 2. Let cancelResult be ! ReadableStreamCancel(stream, compositeReason). + auto cancel_result = MUST(readable_stream_cancel(stream, composite_reason)); + + // 3. Resolve cancelPromise with cancelResult. + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + WebIDL::resolve_promise(realm, cancel_promise, cancel_value); + } + + // 4. Return cancelPromise. + return cancel_promise; + }); + + // 21. Let startAlgorithm be an algorithm that returns undefined. + auto start_algorithm = JS::create_heap_function(realm.heap(), []() -> WebIDL::ExceptionOr { + return JS::js_undefined(); + }); + + // 22. Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm). + params->branch1 = MUST(create_readable_byte_stream(realm, start_algorithm, pull1_algorithm, cancel1_algorithm)); + + // 23. Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm). + params->branch2 = MUST(create_readable_byte_stream(realm, start_algorithm, pull2_algorithm, cancel2_algorithm)); + + // 24. Perform forwardReaderError, given reader. + forward_reader_error->function()(reader); + + // 25. Return « branch1, branch2 ». + return ReadableStreamPair { *params->branch1, *params->branch2 }; +} + // https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function JS::NonnullGCPtr extract_size_algorithm(JS::VM& vm, QueuingStrategy const& strategy) { diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h index a554a1b7f2..cba0df38c0 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h @@ -49,6 +49,7 @@ bool readable_stream_has_default_reader(ReadableStream const&); WebIDL::ExceptionOr readable_stream_tee(JS::Realm&, ReadableStream&, bool clone_for_branch2); WebIDL::ExceptionOr readable_stream_default_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2); +WebIDL::ExceptionOr readable_byte_stream_tee(JS::Realm& realm, ReadableStream& stream); WebIDL::ExceptionOr> readable_stream_reader_generic_cancel(ReadableStreamGenericReaderMixin&, JS::Value reason); void readable_stream_reader_generic_initialize(ReadableStreamReader, ReadableStream&);