From be31e2232cb2a62126ccef4be428eab7703eb617 Mon Sep 17 00:00:00 2001 From: Andreas Kling Date: Sun, 28 Jul 2019 21:27:18 +0200 Subject: [PATCH] AudioServer+LibAudio: Make mixing queue-based instead of buffer-based. Each client connection now sets up an ASBufferQueue, which is basically a queue of ABuffers. This allows us to immediately start streaming the next pending buffer whenever our current buffer runs out of samples. This makes the majority of the skippiness go away for me. :^) Also get rid of the old PlayBuffer API, since we don't need it anymore. --- Libraries/LibAudio/AClientConnection.cpp | 10 --- Libraries/LibAudio/AClientConnection.h | 1 - Libraries/LibAudio/ASAPI.h | 1 - Servers/AudioServer/ASClientConnection.cpp | 41 ++--------- Servers/AudioServer/ASClientConnection.h | 7 +- Servers/AudioServer/ASMixer.cpp | 81 ++++++++-------------- Servers/AudioServer/ASMixer.h | 49 ++++++++++--- 7 files changed, 78 insertions(+), 112 deletions(-) diff --git a/Libraries/LibAudio/AClientConnection.cpp b/Libraries/LibAudio/AClientConnection.cpp index abe9a307c4..d77a455c51 100644 --- a/Libraries/LibAudio/AClientConnection.cpp +++ b/Libraries/LibAudio/AClientConnection.cpp @@ -17,15 +17,6 @@ void AClientConnection::handshake() set_my_client_id(response.greeting.your_client_id); } -void AClientConnection::play(const ABuffer& buffer, bool block) -{ - const_cast(buffer).shared_buffer().share_with(server_pid()); - ASAPI_ClientMessage request; - request.type = ASAPI_ClientMessage::Type::PlayBuffer; - request.play_buffer.buffer_id = buffer.shared_buffer_id(); - sync_request(request, block ? ASAPI_ServerMessage::Type::FinishedPlayingBuffer : ASAPI_ServerMessage::Type::PlayingBuffer); -} - void AClientConnection::enqueue(const ABuffer& buffer) { for (;;) { @@ -36,7 +27,6 @@ void AClientConnection::enqueue(const ABuffer& buffer) auto response = sync_request(request, ASAPI_ServerMessage::Type::EnqueueBufferResponse); if (response.success) break; - dbg() << "EnqueueBuffer failed, retrying..."; sleep(1); } } diff --git a/Libraries/LibAudio/AClientConnection.h b/Libraries/LibAudio/AClientConnection.h index 2cc224353d..1a6c00abac 100644 --- a/Libraries/LibAudio/AClientConnection.h +++ b/Libraries/LibAudio/AClientConnection.h @@ -11,6 +11,5 @@ public: AClientConnection(); virtual void handshake() override; - void play(const ABuffer&, bool block); void enqueue(const ABuffer&); }; diff --git a/Libraries/LibAudio/ASAPI.h b/Libraries/LibAudio/ASAPI.h index 0dabd28c86..47da165de6 100644 --- a/Libraries/LibAudio/ASAPI.h +++ b/Libraries/LibAudio/ASAPI.h @@ -28,7 +28,6 @@ struct ASAPI_ClientMessage { enum class Type { Invalid, Greeting, - PlayBuffer, EnqueueBuffer, }; diff --git a/Servers/AudioServer/ASClientConnection.cpp b/Servers/AudioServer/ASClientConnection.cpp index 2e22dab008..f365feae72 100644 --- a/Servers/AudioServer/ASClientConnection.cpp +++ b/Servers/AudioServer/ASClientConnection.cpp @@ -38,22 +38,6 @@ bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, cons case ASAPI_ClientMessage::Type::Greeting: set_client_pid(message.greeting.client_pid); break; - case ASAPI_ClientMessage::Type::PlayBuffer: { - auto shared_buffer = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id); - if (!shared_buffer) { - did_misbehave(); - return false; - } - - // we no longer need the buffer, so acknowledge that it's playing - ASAPI_ServerMessage reply; - reply.type = ASAPI_ServerMessage::Type::PlayingBuffer; - reply.playing_buffer.buffer_id = message.play_buffer.buffer_id; - post_message(reply); - - m_mixer.queue(*this, ABuffer::create_with_shared_buffer(*shared_buffer)); - break; - } case ASAPI_ClientMessage::Type::EnqueueBuffer: { auto shared_buffer = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id); if (!shared_buffer) { @@ -61,21 +45,19 @@ bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, cons return false; } - static const int max_in_queue = 2; - ASAPI_ServerMessage reply; reply.type = ASAPI_ServerMessage::Type::EnqueueBufferResponse; reply.playing_buffer.buffer_id = message.play_buffer.buffer_id; - if (m_buffer_queue.size() >= max_in_queue) { + + if (!m_queue) + m_queue = m_mixer.create_queue(*this); + + if (m_queue->is_full()) { reply.success = false; } else { - m_buffer_queue.enqueue(ABuffer::create_with_shared_buffer(*shared_buffer)); + m_queue->enqueue(ABuffer::create_with_shared_buffer(*shared_buffer)); } post_message(reply); - - if (m_playing_queued_buffer_id == -1) - play_next_in_queue(); - break; } case ASAPI_ClientMessage::Type::Invalid: @@ -89,19 +71,8 @@ bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, cons void ASClientConnection::did_finish_playing_buffer(Badge, int buffer_id) { - if (m_playing_queued_buffer_id == buffer_id) - play_next_in_queue(); - ASAPI_ServerMessage reply; reply.type = ASAPI_ServerMessage::Type::FinishedPlayingBuffer; reply.playing_buffer.buffer_id = buffer_id; post_message(reply); } - -void ASClientConnection::play_next_in_queue() -{ - dbg() << "Playing next in queue (" << m_buffer_queue.size() << " queued)"; - auto buffer = m_buffer_queue.dequeue(); - m_playing_queued_buffer_id = buffer->shared_buffer_id(); - m_mixer.queue(*this, move(buffer)); -} diff --git a/Servers/AudioServer/ASClientConnection.h b/Servers/AudioServer/ASClientConnection.h index ee89c4fab5..5747bdc841 100644 --- a/Servers/AudioServer/ASClientConnection.h +++ b/Servers/AudioServer/ASClientConnection.h @@ -1,10 +1,10 @@ #pragma once -#include #include #include class ABuffer; +class ASBufferQueue; class ASMixer; class ASClientConnection final : public IPC::Server::Connection { @@ -18,9 +18,6 @@ public: void did_finish_playing_buffer(Badge, int buffer_id); private: - void play_next_in_queue(); - ASMixer& m_mixer; - Queue> m_buffer_queue; - int m_playing_queued_buffer_id { -1 }; + RefPtr m_queue; }; diff --git a/Servers/AudioServer/ASMixer.cpp b/Servers/AudioServer/ASMixer.cpp index 1c9dcf7ed5..77053bd7b7 100644 --- a/Servers/AudioServer/ASMixer.cpp +++ b/Servers/AudioServer/ASMixer.cpp @@ -16,82 +16,60 @@ ASMixer::ASMixer() ASMixer* mixer = (ASMixer*)context; mixer->mix(); return 0; - }, this); + }, + this); } -void ASMixer::queue(ASClientConnection& client, const ABuffer& buffer) +NonnullRefPtr ASMixer::create_queue(ASClientConnection& client) { - ASSERT(buffer.size_in_bytes()); - CLocker lock(m_lock); - m_pending_mixing.append(ASMixerBuffer(buffer, client)); + LOCKER(m_lock); + auto queue = adopt(*new ASBufferQueue(client)); + m_pending_mixing.append(*queue); + return queue; } void ASMixer::mix() { - Vector active_mix_buffers; + decltype(m_pending_mixing) active_mix_queues; for (;;) { { - CLocker lock(m_lock); - active_mix_buffers.append(move(m_pending_mixing)); + LOCKER(m_lock); + active_mix_queues.append(move(m_pending_mixing)); } // ### use a wakeup of some kind rather than this garbage - if (active_mix_buffers.size() == 0) { + if (active_mix_queues.size() == 0) { // nothing to mix yet usleep(10000); continue; } - int max_size = 0; - - for (auto& buffer : active_mix_buffers) { - if (buffer.done) - continue; - ASSERT(buffer.buffer->size_in_bytes()); // zero sized buffer? how? - max_size = max(max_size, buffer.buffer->size_in_bytes() - buffer.pos); - } - - // ### clear up 'done' buffers more aggressively - if (max_size == 0) { - active_mix_buffers.clear(); - continue; - } - - max_size = min(1023, max_size); - - Vector mixed_buffer; - mixed_buffer.resize(max_size); + ASample mixed_buffer[1024]; + auto mixed_buffer_length = (int)(sizeof(mixed_buffer) / sizeof(ASample)); // Mix the buffers together into the output - for (auto& buffer : active_mix_buffers) { - if (buffer.done) + for (auto& queue : active_mix_queues) { + if (!queue->client()) { + queue->clear(); continue; - auto* samples = buffer.buffer->samples(); - auto sample_count = buffer.buffer->sample_count(); - - for (int i = 0; i < max_size && buffer.pos < sample_count; ++buffer.pos, ++i) { - auto& mixed_sample = mixed_buffer[i]; - mixed_sample += samples[buffer.pos]; } - // clear it later - if (buffer.pos == sample_count) { - if (buffer.m_client) - buffer.m_client->did_finish_playing_buffer({}, buffer.buffer->shared_buffer_id()); - buffer.done = true; + for (int i = 0; i < mixed_buffer_length; ++i) { + auto& mixed_sample = mixed_buffer[i]; + ASample sample; + if (!queue->get_next_sample(sample)) + break; + mixed_sample += sample; } } // output the mixed stuff to the device - // max_size is 0 indexed, so add 1. - const int output_buffer_byte_size = (max_size + 1) * 2 * 2; - ASSERT(output_buffer_byte_size == 4096); u8 raw_buffer[4096]; auto buffer = ByteBuffer::wrap(raw_buffer, sizeof(raw_buffer)); BufferStream stream(buffer); - for (int i = 0; i < mixed_buffer.size(); ++i) { + for (int i = 0; i < mixed_buffer_length; ++i) { auto& mixed_sample = mixed_buffer[i]; mixed_sample.clip(); @@ -102,20 +80,21 @@ void ASMixer::mix() ASSERT(!stream.at_end()); // we should have enough space for both channels in one buffer! out_sample = mixed_sample.right * std::numeric_limits::max(); stream << out_sample; - - ASSERT(!stream.at_end()); } if (stream.offset() != 0) { buffer.trim(stream.offset()); m_device.write(buffer); - mixed_buffer.resize(0); } } } -ASMixer::ASMixerBuffer::ASMixerBuffer(const NonnullRefPtr& buf, ASClientConnection& client) - : buffer(buf) - , m_client(client.make_weak_ptr()) +ASBufferQueue::ASBufferQueue(ASClientConnection& client) + : m_client(client.make_weak_ptr()) { } + +void ASBufferQueue::enqueue(NonnullRefPtr&& buffer) +{ + m_queue.enqueue(move(buffer)); +} diff --git a/Servers/AudioServer/ASMixer.h b/Servers/AudioServer/ASMixer.h index 253f3dfc8e..8ddd8573bf 100644 --- a/Servers/AudioServer/ASMixer.h +++ b/Servers/AudioServer/ASMixer.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -10,22 +11,52 @@ class ASClientConnection; +class ASBufferQueue : public RefCounted { +public: + explicit ASBufferQueue(ASClientConnection&); + ~ASBufferQueue() {} + + bool is_full() const { return m_queue.size() >= 3; } + void enqueue(NonnullRefPtr&&); + + bool get_next_sample(ASample& sample) + { + while (!m_current && !m_queue.is_empty()) + m_current = m_queue.dequeue(); + if (!m_current) + return false; + sample = m_current->samples()[m_position++]; + if (m_position >= m_current->sample_count()) { + m_current = nullptr; + m_position = 0; + } + return true; + } + + ASClientConnection* client() { return m_client.ptr(); } + void clear() + { + m_queue.clear(); + m_position = 0; + } + +private: + RefPtr m_current; + Queue> m_queue; + int m_position { 0 }; + int m_playing_queued_buffer_id { -1 }; + WeakPtr m_client; +}; + class ASMixer : public RefCounted { public: ASMixer(); - void queue(ASClientConnection&, const ABuffer&); + NonnullRefPtr create_queue(ASClientConnection&); private: - struct ASMixerBuffer { - ASMixerBuffer(const NonnullRefPtr&, ASClientConnection&); - NonnullRefPtr buffer; - int pos { 0 }; - bool done { false }; - WeakPtr m_client; - }; + Vector> m_pending_mixing; - Vector m_pending_mixing; CFile m_device; CLock m_lock;