From bc4d4f0f9583521c8d4c98b98e04acb4ff1ca673 Mon Sep 17 00:00:00 2001 From: Zaggy1024 Date: Tue, 4 Jul 2023 04:55:53 -0500 Subject: [PATCH] LibAudio: Create a playback class with a PulseAudio implementation This adds an abstract `Audio::PlaybackStream` class to allow cross- platform audio playback to be done in an opaque manner by applications in both Serenity and Lagom. Currently, the only supported audio API is PulseAudio, but a Serenity implementation should be added shortly as well. --- Meta/Lagom/CMakeLists.txt | 2 + Userland/Libraries/LibAudio/CMakeLists.txt | 13 + Userland/Libraries/LibAudio/Forward.h | 1 + .../Libraries/LibAudio/PlaybackStream.cpp | 39 ++ Userland/Libraries/LibAudio/PlaybackStream.h | 72 +++ .../LibAudio/PlaybackStreamPulseAudio.cpp | 192 +++++++ .../LibAudio/PlaybackStreamPulseAudio.h | 62 +++ .../Libraries/LibAudio/PulseAudioWrappers.cpp | 476 ++++++++++++++++++ .../Libraries/LibAudio/PulseAudioWrappers.h | 184 +++++++ 9 files changed, 1041 insertions(+) create mode 100644 Userland/Libraries/LibAudio/PlaybackStream.cpp create mode 100644 Userland/Libraries/LibAudio/PlaybackStream.h create mode 100644 Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.cpp create mode 100644 Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.h create mode 100644 Userland/Libraries/LibAudio/PulseAudioWrappers.cpp create mode 100644 Userland/Libraries/LibAudio/PulseAudioWrappers.h diff --git a/Meta/Lagom/CMakeLists.txt b/Meta/Lagom/CMakeLists.txt index 3f3f144362..484420c759 100644 --- a/Meta/Lagom/CMakeLists.txt +++ b/Meta/Lagom/CMakeLists.txt @@ -143,6 +143,8 @@ if (ENABLE_LAGOM_LADYBIRD AND (ENABLE_FUZZERS OR ENABLE_COMPILER_EXPLORER_BUILD) ) endif() +CHECK_INCLUDE_FILE(pulse/pulseaudio.h HAVE_PULSEAUDIO) + if (CMAKE_CXX_COMPILER_ID MATCHES "Clang$") add_compile_options(-Wno-overloaded-virtual) # FIXME: Re-enable this check when the warning stops triggering, or document why we can't stop it from triggering. diff --git a/Userland/Libraries/LibAudio/CMakeLists.txt b/Userland/Libraries/LibAudio/CMakeLists.txt index d41ce36db0..16ab17e7ea 100644 --- a/Userland/Libraries/LibAudio/CMakeLists.txt +++ b/Userland/Libraries/LibAudio/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCES WavWriter.cpp Metadata.cpp MP3Loader.cpp + PlaybackStream.cpp QOALoader.cpp QOATypes.cpp UserSampleQueue.cpp @@ -25,5 +26,17 @@ if (SERENITYOS) ) endif() +if (HAVE_PULSEAUDIO) + list(APPEND SOURCES + PlaybackStreamPulseAudio.cpp + PulseAudioWrappers.cpp + ) +endif() + serenity_lib(LibAudio audio) target_link_libraries(LibAudio PRIVATE LibCore LibIPC LibThreading LibUnicode LibCrypto) + +if (HAVE_PULSEAUDIO) + target_link_libraries(LibAudio PRIVATE pulse) + target_compile_definitions(LibAudio PRIVATE HAVE_PULSEAUDIO=1) +endif() diff --git a/Userland/Libraries/LibAudio/Forward.h b/Userland/Libraries/LibAudio/Forward.h index 6f0d2dbc14..72f499f890 100644 --- a/Userland/Libraries/LibAudio/Forward.h +++ b/Userland/Libraries/LibAudio/Forward.h @@ -10,6 +10,7 @@ namespace Audio { class ConnectionToServer; class Loader; +class PlaybackStream; struct Sample; template diff --git a/Userland/Libraries/LibAudio/PlaybackStream.cpp b/Userland/Libraries/LibAudio/PlaybackStream.cpp new file mode 100644 index 0000000000..8811fbe2eb --- /dev/null +++ b/Userland/Libraries/LibAudio/PlaybackStream.cpp @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include "PlaybackStream.h" + +#include + +#if defined(HAVE_PULSEAUDIO) +# include +#endif + +namespace Audio { + +#define TRY_OR_REJECT_AND_STOP(expression, promise) \ + ({ \ + auto&& __temporary_result = (expression); \ + if (__temporary_result.is_error()) [[unlikely]] { \ + (promise)->reject(__temporary_result.release_error()); \ + return 1; \ + } \ + __temporary_result.release_value(); \ + }) + +ErrorOr> PlaybackStream::create(OutputState initial_output_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback) +{ + VERIFY(data_request_callback); + // Create the platform-specific implementation for this stream. +#if defined(HAVE_PULSEAUDIO) + return PlaybackStreamPulseAudio::create(initial_output_state, sample_rate, channels, target_latency_ms, move(data_request_callback)); +#else + (void)initial_output_state, (void)sample_rate, (void)channels, (void)target_latency_ms; + return Error::from_string_literal("Audio output is not available for this platform"); +#endif +} + +} diff --git a/Userland/Libraries/LibAudio/PlaybackStream.h b/Userland/Libraries/LibAudio/PlaybackStream.h new file mode 100644 index 0000000000..0162711d9b --- /dev/null +++ b/Userland/Libraries/LibAudio/PlaybackStream.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Audio { + +enum class OutputState { + Playing, + Suspended, +}; + +// This class implements high-level audio playback behavior. It is primarily intended as an abstract cross-platform +// interface to be used by Ladybird (and its dependent libraries) for playback. +// +// The interface is designed to be simple and robust. All control functions can be called safely from any thread. +// Timing information provided by the class should allow audio timestamps to be tracked with the best accuracy possible. +class PlaybackStream : public AtomicRefCounted { +public: + using AudioDataRequestCallback = Function; + + // Creates a new audio Output class. + // + // The initial_output_state parameter determines whether it will begin playback immediately. + // + // The AudioDataRequestCallback will be called when the Output needs more audio data to fill + // its buffers and continue playback. + static ErrorOr> create(OutputState initial_output_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&&); + + virtual ~PlaybackStream() = default; + + // Sets the callback function that will be fired whenever the server consumes more data than is made available + // by the data request callback. It will fire when either the data request runs too long, or the data request + // returns no data. If all the input data has been exhausted and this event fires, that means that playback + // has ended. + virtual void set_underrun_callback(Function) = 0; + + // Resume playback from the suspended state, requesting new data for audio buffers as soon as possible. + // + // The value provided to the promise resolution will match the `total_time_played()` at the exact moment that + // the stream was resumed. + virtual NonnullRefPtr> resume() = 0; + // Completes playback of any buffered audio data and then suspends playback and buffering. + virtual NonnullRefPtr> drain_buffer_and_suspend() = 0; + // Drops any buffered audio data and then suspends playback and buffering. This can used be to stop playback + // as soon as possible instead of waiting for remaining audio to play. + virtual NonnullRefPtr> discard_buffer_and_suspend() = 0; + + // Returns a accurate monotonically-increasing time duration that is based on the number of samples that have + // been played by the output device. The value is interpolated and takes into account latency to the speakers + // whenever possible. + // + // This function should be able to run from any thread safely. + virtual ErrorOr total_time_played() = 0; + + virtual NonnullRefPtr> set_volume(double volume) = 0; +}; + +} diff --git a/Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.cpp b/Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.cpp new file mode 100644 index 0000000000..17e2d1eaa6 --- /dev/null +++ b/Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.cpp @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2023, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include "PlaybackStreamPulseAudio.h" + +#include + +namespace Audio { + +#define TRY_OR_EXIT_THREAD(expression) \ + ({ \ + auto&& __temporary_result = (expression); \ + if (__temporary_result.is_error()) [[unlikely]] { \ + warnln("Failure in PulseAudio control thread: {}", __temporary_result.error().string_literal()); \ + internal_state->exit(); \ + return 1; \ + } \ + __temporary_result.release_value(); \ + }) + +ErrorOr> PlaybackStreamPulseAudio::create(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback) +{ + VERIFY(data_request_callback); + + // Create an internal state for the control thread to hold on to. + auto internal_state = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) InternalState())); + auto playback_stream = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) PlaybackStreamPulseAudio(internal_state))); + + // Create the control thread and start it. + auto thread = TRY(Threading::Thread::try_create([=, data_request_callback = move(data_request_callback)]() mutable { + auto context = TRY_OR_EXIT_THREAD(PulseAudioContext::instance()); + internal_state->set_stream(TRY_OR_EXIT_THREAD(context->create_stream(initial_state, sample_rate, channels, target_latency_ms, [data_request_callback = move(data_request_callback)](PulseAudioStream&, Bytes buffer, size_t sample_count) { + return data_request_callback(buffer, PcmSampleFormat::Float32, sample_count); + }))); + + // PulseAudio retains the last volume it sets for an application. We want to consistently + // start at 100% volume instead. + TRY_OR_EXIT_THREAD(internal_state->stream()->set_volume(1.0)); + + internal_state->thread_loop(); + return 0; + }, + "Audio::PlaybackStream"sv)); + + internal_state->set_thread(thread); + thread->start(); + thread->detach(); + return playback_stream; +} + +PlaybackStreamPulseAudio::PlaybackStreamPulseAudio(NonnullRefPtr state) + : m_state(move(state)) +{ +} + +PlaybackStreamPulseAudio::~PlaybackStreamPulseAudio() +{ + m_state->exit(); +} + +#define TRY_OR_REJECT(expression, ...) \ + ({ \ + auto&& __temporary_result = (expression); \ + if (__temporary_result.is_error()) [[unlikely]] { \ + promise->reject(__temporary_result.release_error()); \ + return __VA_ARGS__; \ + } \ + __temporary_result.release_value(); \ + }) + +void PlaybackStreamPulseAudio::set_underrun_callback(Function callback) +{ + m_state->enqueue([this, callback = move(callback)]() mutable { + m_state->stream()->set_underrun_callback(move(callback)); + }); +} + +NonnullRefPtr> PlaybackStreamPulseAudio::resume() +{ + auto promise = Core::ThreadedPromise::create(); + TRY_OR_REJECT(m_state->check_is_running(), promise); + m_state->enqueue([this, promise]() { + TRY_OR_REJECT(m_state->stream()->resume()); + promise->resolve(TRY_OR_REJECT(m_state->stream()->total_time_played())); + }); + return promise; +} + +NonnullRefPtr> PlaybackStreamPulseAudio::drain_buffer_and_suspend() +{ + auto promise = Core::ThreadedPromise::create(); + TRY_OR_REJECT(m_state->check_is_running(), promise); + m_state->enqueue([this, promise]() { + TRY_OR_REJECT(m_state->stream()->drain_and_suspend()); + promise->resolve(); + }); + return promise; +} + +NonnullRefPtr> PlaybackStreamPulseAudio::discard_buffer_and_suspend() +{ + auto promise = Core::ThreadedPromise::create(); + TRY_OR_REJECT(m_state->check_is_running(), promise); + m_state->enqueue([this, promise]() { + TRY_OR_REJECT(m_state->stream()->flush_and_suspend()); + promise->resolve(); + }); + return promise; +} + +ErrorOr PlaybackStreamPulseAudio::total_time_played() +{ + if (m_state->stream() != nullptr) + return m_state->stream()->total_time_played(); + return Duration::zero(); +} + +NonnullRefPtr> PlaybackStreamPulseAudio::set_volume(double volume) +{ + auto promise = Core::ThreadedPromise::create(); + TRY_OR_REJECT(m_state->check_is_running(), promise); + m_state->enqueue([this, promise, volume]() { + TRY_OR_REJECT(m_state->stream()->set_volume(volume)); + promise->resolve(); + }); + return promise; +} + +ErrorOr PlaybackStreamPulseAudio::InternalState::check_is_running() +{ + if (m_exit) + return Error::from_string_literal("PulseAudio control thread loop is not running"); + return {}; +} + +void PlaybackStreamPulseAudio::InternalState::set_thread(NonnullRefPtr const& thread) +{ + Threading::MutexLocker locker { m_mutex }; + m_thread = thread; +} + +void PlaybackStreamPulseAudio::InternalState::set_stream(NonnullRefPtr const& stream) +{ + m_stream = stream; +} + +RefPtr PlaybackStreamPulseAudio::InternalState::stream() +{ + return m_stream; +} + +void PlaybackStreamPulseAudio::InternalState::enqueue(Function&& task) +{ + Threading::MutexLocker locker { m_mutex }; + m_tasks.enqueue(forward>(task)); + m_wake_condition.signal(); +} + +void PlaybackStreamPulseAudio::InternalState::thread_loop() +{ + while (true) { + auto task = [this]() -> Function { + Threading::MutexLocker locker { m_mutex }; + + while (m_tasks.is_empty() && !m_exit) + m_wake_condition.wait(); + if (m_exit) + return nullptr; + return m_tasks.dequeue(); + }(); + if (!task) { + VERIFY(m_exit); + break; + } + task(); + } + + // Stop holding onto our thread so it can be deleted. + Threading::MutexLocker locker { m_mutex }; + m_thread = nullptr; +} + +void PlaybackStreamPulseAudio::InternalState::exit() +{ + m_exit = true; + m_wake_condition.signal(); +} + +} diff --git a/Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.h b/Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.h new file mode 100644 index 0000000000..001149987f --- /dev/null +++ b/Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include + +namespace Audio { + +class PlaybackStreamPulseAudio final + : public PlaybackStream { +public: + static ErrorOr> create(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback); + + virtual void set_underrun_callback(Function) override; + + virtual NonnullRefPtr> resume() override; + virtual NonnullRefPtr> drain_buffer_and_suspend() override; + virtual NonnullRefPtr> discard_buffer_and_suspend() override; + + virtual ErrorOr total_time_played() override; + + virtual NonnullRefPtr> set_volume(double) override; + +private: + // This struct is kept alive until the control thread exits to prevent a use-after-free without blocking on + // the UI thread. + class InternalState : public AtomicRefCounted { + public: + void set_thread(NonnullRefPtr const&); + + void set_stream(NonnullRefPtr const&); + RefPtr stream(); + + void enqueue(Function&&); + void thread_loop(); + ErrorOr check_is_running(); + void exit(); + + private: + RefPtr m_stream { nullptr }; + + Queue> m_tasks; + Threading::Mutex m_mutex; + Threading::ConditionVariable m_wake_condition { m_mutex }; + + Atomic m_exit { false }; + + RefPtr m_thread { nullptr }; + }; + + PlaybackStreamPulseAudio(NonnullRefPtr); + ~PlaybackStreamPulseAudio(); + + RefPtr m_state; +}; + +} diff --git a/Userland/Libraries/LibAudio/PulseAudioWrappers.cpp b/Userland/Libraries/LibAudio/PulseAudioWrappers.cpp new file mode 100644 index 0000000000..f99d338ea9 --- /dev/null +++ b/Userland/Libraries/LibAudio/PulseAudioWrappers.cpp @@ -0,0 +1,476 @@ +/* + * Copyright (c) 2023, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include "PulseAudioWrappers.h" + +#include +#include + +namespace Audio { + +ErrorOr> PulseAudioContext::instance() +{ + // Use a weak pointer to allow the context to be shut down if we stop outputting audio. + static WeakPtr the_instance; + static Threading::Mutex instantiation_mutex; + auto instantiation_locker = Threading::MutexLocker(instantiation_mutex); + + RefPtr strong_instance_pointer = the_instance.strong_ref(); + + if (strong_instance_pointer == nullptr) { + auto* main_loop = pa_threaded_mainloop_new(); + if (main_loop == nullptr) + return Error::from_string_literal("Failed to create PulseAudio main loop"); + + auto* api = pa_threaded_mainloop_get_api(main_loop); + if (api == nullptr) + return Error::from_string_literal("Failed to get PulseAudio API"); + + auto* context = pa_context_new(api, "Ladybird"); + if (context == nullptr) + return Error::from_string_literal("Failed to get PulseAudio connection context"); + + strong_instance_pointer = make_ref_counted(main_loop, api, context); + + // Set a callback to signal ourselves to wake when the state changes, so that we can + // synchronously wait for the connection. + pa_context_set_state_callback( + context, [](pa_context*, void* user_data) { + static_cast(user_data)->signal_to_wake(); + }, + strong_instance_pointer.ptr()); + + if (auto error = pa_context_connect(context, nullptr, PA_CONTEXT_NOFLAGS, nullptr); error < 0) { + warnln("Starting PulseAudio context connection failed with error: {}", pulse_audio_error_to_string(static_cast(-error))); + return Error::from_string_literal("Error while starting PulseAudio daemon connection"); + } + + if (auto error = pa_threaded_mainloop_start(main_loop); error < 0) { + warnln("Starting PulseAudio main loop failed with error: {}", pulse_audio_error_to_string(static_cast(-error))); + return Error::from_string_literal("Failed to start PulseAudio main loop"); + } + + { + auto locker = strong_instance_pointer->main_loop_locker(); + while (true) { + bool is_ready = false; + switch (strong_instance_pointer->get_connection_state()) { + case PulseAudioContextState::Connecting: + case PulseAudioContextState::Authorizing: + case PulseAudioContextState::SettingName: + break; + case PulseAudioContextState::Ready: + is_ready = true; + break; + case PulseAudioContextState::Failed: + warnln("PulseAudio server connection failed with error: {}", pulse_audio_error_to_string(strong_instance_pointer->get_last_error())); + return Error::from_string_literal("Failed to connect to PulseAudio server"); + case PulseAudioContextState::Unconnected: + case PulseAudioContextState::Terminated: + VERIFY_NOT_REACHED(); + break; + } + + if (is_ready) + break; + + strong_instance_pointer->wait_for_signal(); + } + + pa_context_set_state_callback(context, nullptr, nullptr); + } + + the_instance = strong_instance_pointer; + } + + return strong_instance_pointer.release_nonnull(); +} + +PulseAudioContext::PulseAudioContext(pa_threaded_mainloop* main_loop, pa_mainloop_api* api, pa_context* context) + : m_main_loop(main_loop) + , m_api(api) + , m_context(context) +{ +} + +PulseAudioContext::~PulseAudioContext() +{ + pa_context_disconnect(m_context); + pa_context_unref(m_context); + pa_threaded_mainloop_stop(m_main_loop); + pa_threaded_mainloop_free(m_main_loop); +} + +bool PulseAudioContext::current_thread_is_main_loop_thread() +{ + return static_cast(pa_threaded_mainloop_in_thread(m_main_loop)); +} + +void PulseAudioContext::lock_main_loop() +{ + if (!current_thread_is_main_loop_thread()) + pa_threaded_mainloop_lock(m_main_loop); +} + +void PulseAudioContext::unlock_main_loop() +{ + if (!current_thread_is_main_loop_thread()) + pa_threaded_mainloop_unlock(m_main_loop); +} + +void PulseAudioContext::wait_for_signal() +{ + pa_threaded_mainloop_wait(m_main_loop); +} + +void PulseAudioContext::signal_to_wake() +{ + pa_threaded_mainloop_signal(m_main_loop, 0); +} + +PulseAudioContextState PulseAudioContext::get_connection_state() +{ + return static_cast(pa_context_get_state(m_context)); +} + +bool PulseAudioContext::connection_is_good() +{ + return PA_CONTEXT_IS_GOOD(pa_context_get_state(m_context)); +} + +PulseAudioErrorCode PulseAudioContext::get_last_error() +{ + return static_cast(pa_context_errno(m_context)); +} + +#define STREAM_SIGNAL_CALLBACK(stream) \ + [](auto*, int, void* user_data) { \ + static_cast(user_data)->m_context->signal_to_wake(); \ + }, \ + (stream) + +ErrorOr> PulseAudioContext::create_stream(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, PulseAudioDataRequestCallback write_callback) +{ + auto locker = main_loop_locker(); + + VERIFY(get_connection_state() == PulseAudioContextState::Ready); + pa_sample_spec sample_specification { + // FIXME: Support more audio sample types. + __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ ? PA_SAMPLE_FLOAT32LE : PA_SAMPLE_FLOAT32BE, + sample_rate, + channels, + }; + + // Check the sample specification and channel map here. These are also checked by stream_new(), + // but we can return a more accurate error if we check beforehand. + if (pa_sample_spec_valid(&sample_specification) == 0) + return Error::from_string_literal("PulseAudio sample specification is invalid"); + pa_channel_map channel_map; + if (pa_channel_map_init_auto(&channel_map, sample_specification.channels, PA_CHANNEL_MAP_DEFAULT) == 0) { + warnln("Getting default PulseAudio channel map failed with error: {}", pulse_audio_error_to_string(get_last_error())); + return Error::from_string_literal("Failed to get default PulseAudio channel map"); + } + + // Create the stream object and set a callback to signal ourselves to wake when the stream changes states, + // allowing us to wait synchronously for it to become Ready or Failed. + auto* stream = pa_stream_new_with_proplist(m_context, "Audio Stream", &sample_specification, &channel_map, nullptr); + if (stream == nullptr) { + warnln("Instantiating PulseAudio stream failed with error: {}", pulse_audio_error_to_string(get_last_error())); + return Error::from_string_literal("Failed to create PulseAudio stream"); + } + pa_stream_set_state_callback( + stream, [](pa_stream*, void* user_data) { + static_cast(user_data)->signal_to_wake(); + }, + this); + + auto stream_wrapper = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) PulseAudioStream(NonnullRefPtr(*this), stream))); + + stream_wrapper->m_write_callback = move(write_callback); + pa_stream_set_write_callback( + stream, [](pa_stream* stream, size_t bytes_to_write, void* user_data) { + auto& stream_wrapper = *static_cast(user_data); + VERIFY(stream_wrapper.m_stream == stream); + stream_wrapper.on_write_requested(bytes_to_write); + }, + stream_wrapper.ptr()); + + // Borrowing logic from cubeb to set reasonable buffer sizes for a target latency: + // https://searchfox.org/mozilla-central/rev/3b707c8fd7e978eebf24279ee51ccf07895cfbcb/third_party/rust/cubeb-sys/libcubeb/src/cubeb_pulse.c#910-927 + pa_buffer_attr buffer_attributes; + buffer_attributes.maxlength = -1; + buffer_attributes.prebuf = -1; + buffer_attributes.tlength = target_latency_ms * sample_rate / 1000; + buffer_attributes.minreq = buffer_attributes.tlength / 4; + buffer_attributes.fragsize = buffer_attributes.minreq; + auto flags = static_cast(PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_RELATIVE_VOLUME); + + if (initial_state == OutputState::Suspended) { + stream_wrapper->m_suspended = true; + flags = static_cast(static_cast(flags) | PA_STREAM_START_CORKED); + } + + // This is a workaround for an issue with starting the stream corked, see PulseAudioPlaybackStream::total_time_played(). + pa_stream_set_started_callback( + stream, [](pa_stream* stream, void* user_data) { + static_cast(user_data)->m_started_playback = true; + pa_stream_set_started_callback(stream, nullptr, nullptr); + }, + stream_wrapper.ptr()); + + pa_stream_set_underflow_callback( + stream, [](pa_stream*, void* user_data) { + auto& stream = *static_cast(user_data); + if (stream.m_underrun_callback) + stream.m_underrun_callback(); + }, + stream_wrapper.ptr()); + + if (auto error = pa_stream_connect_playback(stream, nullptr, &buffer_attributes, flags, nullptr, nullptr); error != 0) { + warnln("PulseAudio stream connection failed with error: {}", pulse_audio_error_to_string(static_cast(error))); + return Error::from_string_literal("Error while connecting the PulseAudio stream"); + } + + // FIXME: This should be asynchronous if connection can take longer than a fraction of a second. + while (true) { + bool is_ready = false; + switch (stream_wrapper->get_connection_state()) { + case PulseAudioStreamState::Creating: + break; + case PulseAudioStreamState::Ready: + is_ready = true; + break; + case PulseAudioStreamState::Failed: + return Error::from_string_literal("Failed to connect to PulseAudio daemon"); + case PulseAudioStreamState::Unconnected: + case PulseAudioStreamState::Terminated: + VERIFY_NOT_REACHED(); + break; + } + if (is_ready) + break; + + wait_for_signal(); + } + + return stream_wrapper; +} + +PulseAudioStream::~PulseAudioStream() +{ + pa_stream_unref(m_stream); +} + +PulseAudioStreamState PulseAudioStream::get_connection_state() +{ + return static_cast(pa_stream_get_state(m_stream)); +} + +bool PulseAudioStream::connection_is_good() +{ + return PA_STREAM_IS_GOOD(pa_stream_get_state(m_stream)); +} + +void PulseAudioStream::set_underrun_callback(Function callback) +{ + auto locker = m_context->main_loop_locker(); + m_underrun_callback = move(callback); +} + +u32 PulseAudioStream::sample_rate() +{ + return pa_stream_get_sample_spec(m_stream)->rate; +} + +size_t PulseAudioStream::sample_size() +{ + return pa_sample_size(pa_stream_get_sample_spec(m_stream)); +} + +size_t PulseAudioStream::frame_size() +{ + return pa_frame_size(pa_stream_get_sample_spec(m_stream)); +} + +u8 PulseAudioStream::channel_count() +{ + return pa_stream_get_sample_spec(m_stream)->channels; +} + +void PulseAudioStream::on_write_requested(size_t bytes_to_write) +{ + VERIFY(m_write_callback); + if (m_suspended) + return; + while (bytes_to_write > 0) { + auto buffer = begin_write(bytes_to_write).release_value_but_fixme_should_propagate_errors(); + auto frame_size = this->frame_size(); + VERIFY(buffer.size() % frame_size == 0); + auto written_buffer = m_write_callback(*this, buffer, buffer.size() / frame_size); + if (written_buffer.size() == 0) { + cancel_write().release_value_but_fixme_should_propagate_errors(); + break; + } + bytes_to_write -= written_buffer.size(); + write(written_buffer).release_value_but_fixme_should_propagate_errors(); + } +} + +ErrorOr PulseAudioStream::begin_write(size_t bytes_to_write) +{ + void* data_pointer; + size_t data_size = bytes_to_write; + if (pa_stream_begin_write(m_stream, &data_pointer, &data_size) != 0 || data_pointer == nullptr) + return Error::from_string_literal("Failed to get the playback stream's write buffer from PulseAudio"); + return Bytes { data_pointer, data_size }; +} + +ErrorOr PulseAudioStream::write(ReadonlyBytes data) +{ + if (pa_stream_write(m_stream, data.data(), data.size(), nullptr, 0, PA_SEEK_RELATIVE) != 0) + return Error::from_string_literal("Failed to write data to PulseAudio playback stream"); + return {}; +} + +ErrorOr PulseAudioStream::cancel_write() +{ + if (pa_stream_cancel_write(m_stream) != 0) + return Error::from_string_literal("Failed to get the playback stream's write buffer from PulseAudio"); + return {}; +} + +bool PulseAudioStream::is_suspended() const +{ + return m_suspended; +} + +StringView pulse_audio_error_to_string(PulseAudioErrorCode code) +{ + if (code < PulseAudioErrorCode::OK || code >= PulseAudioErrorCode::Sentinel) + return "Unknown error code"sv; + + char const* string = pa_strerror(static_cast(code)); + return StringView { string, strlen(string) }; +} + +ErrorOr PulseAudioStream::wait_for_operation(pa_operation* operation, StringView error_message) +{ + while (pa_operation_get_state(operation) == PA_OPERATION_RUNNING) + m_context->wait_for_signal(); + if (!m_context->connection_is_good() || !this->connection_is_good()) { + auto pulse_audio_error_name = pulse_audio_error_to_string(m_context->get_last_error()); + warnln("Encountered stream error: {}", pulse_audio_error_name); + return Error::from_string_view(error_message); + } + pa_operation_unref(operation); + return {}; +} + +ErrorOr PulseAudioStream::drain_and_suspend() +{ + auto locker = m_context->main_loop_locker(); + + if (m_suspended) + return {}; + m_suspended = true; + + if (pa_stream_is_corked(m_stream) > 0) + return {}; + + TRY(wait_for_operation(pa_stream_drain(m_stream, STREAM_SIGNAL_CALLBACK(this)), "Draining PulseAudio stream failed"sv)); + TRY(wait_for_operation(pa_stream_cork(m_stream, 1, STREAM_SIGNAL_CALLBACK(this)), "Corking PulseAudio stream after drain failed"sv)); + return {}; +} + +ErrorOr PulseAudioStream::flush_and_suspend() +{ + auto locker = m_context->main_loop_locker(); + + if (m_suspended) + return {}; + m_suspended = true; + + if (pa_stream_is_corked(m_stream) > 0) + return {}; + + TRY(wait_for_operation(pa_stream_flush(m_stream, STREAM_SIGNAL_CALLBACK(this)), "Flushing PulseAudio stream failed"sv)); + TRY(wait_for_operation(pa_stream_cork(m_stream, 1, STREAM_SIGNAL_CALLBACK(this)), "Corking PulseAudio stream after flush failed"sv)); + return {}; +} + +ErrorOr PulseAudioStream::resume() +{ + auto locker = m_context->main_loop_locker(); + + if (!m_suspended) + return {}; + m_suspended = false; + + TRY(wait_for_operation(pa_stream_cork(m_stream, 0, STREAM_SIGNAL_CALLBACK(this)), "Uncorking PulseAudio stream failed"sv)); + + // Defer a write to the playback buffer on the PulseAudio main loop. Otherwise, playback will not + // begin again, despite the fact that we uncorked. + // NOTE: We ref here and then unref in the callback so that this stream will not be deleted until + // it finishes. + ref(); + pa_mainloop_api_once( + m_context->m_api, [](pa_mainloop_api*, void* user_data) { + auto& stream = *static_cast(user_data); + // NOTE: writable_size() returns -1 in case of an error. However, the value is still safe + // since begin_write() will interpret -1 as a default parameter and choose a good size. + auto bytes_to_write = pa_stream_writable_size(stream.m_stream); + stream.on_write_requested(bytes_to_write); + stream.unref(); + }, + this); + return {}; +} + +ErrorOr PulseAudioStream::total_time_played() +{ + auto locker = m_context->main_loop_locker(); + + // NOTE: This is a workaround for a PulseAudio issue. When a stream is started corked, + // the time smoother doesn't seem to be aware of it, so it will return the time + // since the stream was connected. Once the playback actually starts, the time + // resets back to zero. However, since we request monotonically-increasing time, + // this means that the smoother will register that it had a larger time before, + // and return that time instead, until we reach a timestamp greater than the + // last-returned time. If we never call pa_stream_get_time() until after giving + // the stream its first samples, the issue never occurs. + if (!m_started_playback) + return Duration::zero(); + + pa_usec_t time = 0; + auto error = pa_stream_get_time(m_stream, &time); + if (error == -PA_ERR_NODATA) + return Duration::zero(); + if (error != 0) + return Error::from_string_literal("Failed to get time from PulseAudio stream"); + if (time > NumericLimits::max()) { + warnln("WARNING: Audio time is too large!"); + time -= NumericLimits::max(); + } + return Duration::from_microseconds(static_cast(time)); +} + +ErrorOr PulseAudioStream::set_volume(double volume) +{ + auto locker = m_context->main_loop_locker(); + + auto index = pa_stream_get_index(m_stream); + if (index == PA_INVALID_INDEX) + return Error::from_string_literal("Failed to get PulseAudio stream index while setting volume"); + + auto pulse_volume = pa_sw_volume_from_linear(volume); + pa_cvolume per_channel_volumes; + pa_cvolume_set(&per_channel_volumes, channel_count(), pulse_volume); + + auto* operation = pa_context_set_sink_input_volume(m_context->m_context, index, &per_channel_volumes, STREAM_SIGNAL_CALLBACK(this)); + return wait_for_operation(operation, "Failed to set PulseAudio stream volume"sv); +} + +} diff --git a/Userland/Libraries/LibAudio/PulseAudioWrappers.h b/Userland/Libraries/LibAudio/PulseAudioWrappers.h new file mode 100644 index 0000000000..a40fcc431c --- /dev/null +++ b/Userland/Libraries/LibAudio/PulseAudioWrappers.h @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2023, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Audio { + +class PulseAudioStream; + +enum class PulseAudioContextState { + Unconnected = PA_CONTEXT_UNCONNECTED, + Connecting = PA_CONTEXT_CONNECTING, + Authorizing = PA_CONTEXT_AUTHORIZING, + SettingName = PA_CONTEXT_SETTING_NAME, + Ready = PA_CONTEXT_READY, + Failed = PA_CONTEXT_FAILED, + Terminated = PA_CONTEXT_TERMINATED, +}; + +enum class PulseAudioErrorCode; + +using PulseAudioDataRequestCallback = Function; + +// A wrapper around the PulseAudio main loop and context structs. +// Generally, only one instance of this should be needed for a single process. +class PulseAudioContext + : public AtomicRefCounted + , public Weakable { +public: + static ErrorOr> instance(); + + explicit PulseAudioContext(pa_threaded_mainloop*, pa_mainloop_api*, pa_context*); + PulseAudioContext(PulseAudioContext const& other) = delete; + ~PulseAudioContext(); + + bool current_thread_is_main_loop_thread(); + void lock_main_loop(); + void unlock_main_loop(); + [[nodiscard]] auto main_loop_locker() + { + lock_main_loop(); + return ScopeGuard([this]() { unlock_main_loop(); }); + } + // Waits for signal_to_wake() to be called. + // This must be called with the main loop locked. + void wait_for_signal(); + // Signals to wake all threads from calls to signal_to_wake() + void signal_to_wake(); + + PulseAudioContextState get_connection_state(); + bool connection_is_good(); + PulseAudioErrorCode get_last_error(); + + ErrorOr> create_stream(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, PulseAudioDataRequestCallback write_callback); + +private: + friend class PulseAudioStream; + + pa_threaded_mainloop* m_main_loop { nullptr }; + pa_mainloop_api* m_api { nullptr }; + pa_context* m_context; +}; + +enum class PulseAudioStreamState { + Unconnected = PA_STREAM_UNCONNECTED, + Creating = PA_STREAM_CREATING, + Ready = PA_STREAM_READY, + Failed = PA_STREAM_FAILED, + Terminated = PA_STREAM_TERMINATED, +}; + +class PulseAudioStream : public AtomicRefCounted { +public: + static constexpr bool start_corked = true; + + ~PulseAudioStream(); + + PulseAudioStreamState get_connection_state(); + bool connection_is_good(); + + // Sets the callback to be run when the server consumes more of the buffer than + // has been written yet. + void set_underrun_callback(Function); + + u32 sample_rate(); + size_t sample_size(); + size_t frame_size(); + u8 channel_count(); + // Gets a data buffer that can be written to and then passed back to PulseAudio through + // the write() function. This avoids a copy vs directly calling write(). + ErrorOr begin_write(size_t bytes_to_write = NumericLimits::max()); + // Writes a data buffer to the playback stream. + ErrorOr write(ReadonlyBytes data); + // Cancels the previous begin_write() call. + ErrorOr cancel_write(); + + bool is_suspended() const; + // Plays back all buffered data and corks the stream. Until resume() is called, no data + // will be written to the stream. + ErrorOr drain_and_suspend(); + // Drops all buffered data and corks the stream. Until resume() is called, no data will + // be written to the stream. + ErrorOr flush_and_suspend(); + // Uncorks the stream and forces data to be written to the buffers to force playback to + // resume as soon as possible. + ErrorOr resume(); + ErrorOr total_time_played(); + + ErrorOr set_volume(double volume); + + PulseAudioContext& context() { return *m_context; } + +private: + friend class PulseAudioContext; + + explicit PulseAudioStream(NonnullRefPtr&& context, pa_stream* stream) + : m_context(context) + , m_stream(stream) + { + } + PulseAudioStream(PulseAudioStream const& other) = delete; + + ErrorOr wait_for_operation(pa_operation*, StringView error_message); + + void on_write_requested(size_t bytes_to_write); + + NonnullRefPtr m_context; + pa_stream* m_stream { nullptr }; + bool m_started_playback { false }; + PulseAudioDataRequestCallback m_write_callback { nullptr }; + // Determines whether we will allow the write callback to run. This should only be true + // if the stream is becoming or is already corked. + bool m_suspended { false }; + + Function m_underrun_callback; +}; + +enum class PulseAudioErrorCode { + OK = 0, + AccessFailure, + UnknownCommand, + InvalidArgument, + EntityExists, + NoSuchEntity, + ConnectionRefused, + ProtocolError, + Timeout, + NoAuthenticationKey, + InternalError, + ConnectionTerminated, + EntityKilled, + InvalidServer, + NoduleInitFailed, + BadState, + NoData, + IncompatibleProtocolVersion, + DataTooLarge, + NotSupported, + Unknown, + NoExtension, + Obsolete, + NotImplemented, + CalledFromFork, + IOError, + Busy, + Sentinel +}; + +StringView pulse_audio_error_to_string(PulseAudioErrorCode code); + +}