diff --git a/Libraries/LibAudio/ABuffer.h b/Libraries/LibAudio/ABuffer.h new file mode 100644 index 0000000000..30a3742e7d --- /dev/null +++ b/Libraries/LibAudio/ABuffer.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include + +// A single sample in an audio buffer. +// Values are floating point, and should range from -1.0 to +1.0 +struct ASample { + ASample() + : left(0) + , right(0) + {} + + // For mono + ASample(float left) + : left(left) + , right(left) + {} + + // For stereo + ASample(float left, float right) + : left(left) + , right(right) + {} + + void clamp() + { + if (left > 1) + left = 1; + else if (left < -1) + left = -1; + + if (right > 1) + right = 1; + else if (right < -1) + right = -1; + } + + ASample& operator+=(const ASample& other) + { + left += other.left; + right += other.right; + return *this; + } + + float left; + float right; +}; + +// A buffer of audio samples, normalized to 44100hz. +class ABuffer : public RefCounted { +public: + static RefPtr from_pcm_data(ByteBuffer& data, int num_channels, int bits_per_sample, int source_rate); + ABuffer(Vector& samples) + : m_samples(samples) + {} + + const Vector& samples() const { return m_samples; } + Vector& samples() { return m_samples; } + const void* data() const { return m_samples.data(); } + int size_in_bytes() const { return m_samples.size() * sizeof(ASample); } + +private: + Vector m_samples; +}; + diff --git a/Libraries/LibAudio/AClientConnection.cpp b/Libraries/LibAudio/AClientConnection.cpp index 002cc22fca..8cb3009564 100644 --- a/Libraries/LibAudio/AClientConnection.cpp +++ b/Libraries/LibAudio/AClientConnection.cpp @@ -1,13 +1,30 @@ #include "AClientConnection.h" +#include "ABuffer.h" +#include +#include #include #include +#include +#include +#include +#include AClientConnection::AClientConnection() + : m_notifier(CNotifier(m_connection.fd(), CNotifier::Read)) { + // We want to rate-limit our clients + m_connection.set_blocking(true); + m_notifier.on_ready_to_read = [this] { + drain_messages_from_server(); + }; m_connection.on_connected = [this] { - m_notifier = make(m_connection.fd(), CNotifier::Read); - m_notifier->on_ready_to_read = [this] { printf("AudioServer said something to us"); }; - m_connection.write("Hello, friends"); + ASAPI_ClientMessage request; + request.type = ASAPI_ClientMessage::Type::Greeting; + request.greeting.client_pid = getpid(); + auto response = sync_request(request, ASAPI_ServerMessage::Type::Greeting); + m_server_pid = response.greeting.server_pid; + m_my_client_id = response.greeting.your_client_id; + dbg() << "**** C: Got greeting from AudioServer: client ID " << m_my_client_id << " PID " << m_server_pid; }; int retries = 1000; @@ -17,10 +34,121 @@ AClientConnection::AClientConnection() } #ifdef ACLIENT_DEBUG - dbgprintf("AClientConnection: connect failed: %d, %s\n", errno, strerror(errno)); + dbgprintf("**** C: AClientConnection: connect failed: %d, %s\n", errno, strerror(errno)); #endif sleep(1); --retries; } } +bool AClientConnection::drain_messages_from_server() +{ + for (;;) { + ASAPI_ServerMessage message; + ssize_t nread = recv(m_connection.fd(), &message, sizeof(ASAPI_ServerMessage), MSG_DONTWAIT); + if (nread < 0) { + if (errno == EAGAIN) { + return true; + } + perror("read"); + exit(1); + return false; + } + if (nread == 0) { + dbgprintf("EOF on IPC fd\n"); + exit(1); + exit(-1); + return false; + } + ASSERT(nread == sizeof(message)); + ByteBuffer extra_data; + if (message.extra_size) { + extra_data = ByteBuffer::create_uninitialized(message.extra_size); + int extra_nread = read(m_connection.fd(), extra_data.data(), extra_data.size()); + if (extra_nread < 0) { + perror("read"); + ASSERT_NOT_REACHED(); + } + ASSERT((size_t)extra_nread == message.extra_size); + } + m_unprocessed_bundles.append({ move(message), move(extra_data) }); + } +} + +bool AClientConnection::wait_for_specific_event(ASAPI_ServerMessage::Type type, ASAPI_ServerMessage& event) +{ + for (;;) { + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(m_connection.fd(), &rfds); + int rc = select(m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr); + if (rc < 0) { + perror("select"); + } + ASSERT(rc > 0); + ASSERT(FD_ISSET(m_connection.fd(), &rfds)); + bool success = drain_messages_from_server(); + if (!success) + return false; + for (ssize_t i = 0; i < m_unprocessed_bundles.size(); ++i) { + if (m_unprocessed_bundles[i].message.type == type) { + event = move(m_unprocessed_bundles[i].message); + m_unprocessed_bundles.remove(i); + return true; + } + } + } +} + +bool AClientConnection::post_message_to_server(const ASAPI_ClientMessage& message, const ByteBuffer& extra_data) +{ + if (!extra_data.is_empty()) + const_cast(message).extra_size = extra_data.size(); + + struct iovec iov[2]; + int iov_count = 1; + iov[0].iov_base = const_cast(&message); + iov[0].iov_len = sizeof(message); + + if (!extra_data.is_empty()) { + iov[1].iov_base = const_cast(extra_data.data()); + iov[1].iov_len = extra_data.size(); + ++iov_count; + } + + int nwritten = writev(m_connection.fd(), iov, iov_count); + if (nwritten < 0) { + perror("writev"); + ASSERT_NOT_REACHED(); + } + ASSERT((size_t)nwritten == sizeof(message) + extra_data.size()); + + return true; +} + +ASAPI_ServerMessage AClientConnection::sync_request(const ASAPI_ClientMessage& request, ASAPI_ServerMessage::Type response_type) +{ + bool success = post_message_to_server(request); + ASSERT(success); + + ASAPI_ServerMessage response; + success = wait_for_specific_event(response_type, response); + ASSERT(success); + return response; +} + +void AClientConnection::play(const ABuffer& buffer) +{ + auto shared_buf = SharedBuffer::create(m_server_pid, buffer.size_in_bytes()); + if (!shared_buf) { + dbg() << "Failed to create a shared buffer!"; + return; + } + + memcpy(shared_buf->data(), buffer.data(), buffer.size_in_bytes()); + shared_buf->seal(); + ASAPI_ClientMessage request; + request.type = ASAPI_ClientMessage::Type::PlayBuffer; + request.play_buffer.buffer_id = shared_buf->shared_buffer_id(); + sync_request(request, ASAPI_ServerMessage::Type::PlayingBuffer); +} diff --git a/Libraries/LibAudio/AClientConnection.h b/Libraries/LibAudio/AClientConnection.h index 7df832babc..5ef369d942 100644 --- a/Libraries/LibAudio/AClientConnection.h +++ b/Libraries/LibAudio/AClientConnection.h @@ -2,12 +2,29 @@ #include #include +#include +class ABuffer; class AClientConnection { public: AClientConnection(); + void play(const ABuffer& buffer); + private: + bool drain_messages_from_server(); + bool wait_for_specific_event(ASAPI_ServerMessage::Type type, ASAPI_ServerMessage& event); + bool post_message_to_server(const ASAPI_ClientMessage& message, const ByteBuffer& extra_data = {}); + ASAPI_ServerMessage sync_request(const ASAPI_ClientMessage& request, ASAPI_ServerMessage::Type response_type); + CLocalSocket m_connection; - OwnPtr m_notifier; + CNotifier m_notifier; + + struct IncomingASMessageBundle { + ASAPI_ServerMessage message; + ByteBuffer extra_data; + }; + Vector m_unprocessed_bundles; + int m_server_pid; + int m_my_client_id; }; diff --git a/Libraries/LibAudio/ASAPI.h b/Libraries/LibAudio/ASAPI.h new file mode 100644 index 0000000000..e1c3c0b09a --- /dev/null +++ b/Libraries/LibAudio/ASAPI.h @@ -0,0 +1,42 @@ +#pragma once + +struct ASAPI_ServerMessage { + enum class Type { + Invalid, + Greeting, + PlayingBuffer, + }; + + Type type { Type::Invalid }; + unsigned extra_size { 0 }; + + union { + struct { + int server_pid; + int your_client_id; + } greeting; + struct { + int buffer_id; + } playing_buffer; + }; +}; + +struct ASAPI_ClientMessage { + enum class Type { + Invalid, + Greeting, + PlayBuffer, + }; + + Type type { Type::Invalid }; + unsigned extra_size { 0 }; + + union { + struct { + int client_pid; + } greeting; + struct { + int buffer_id; + } play_buffer; + }; +}; diff --git a/Libraries/LibAudio/AWavFile.cpp b/Libraries/LibAudio/AWavFile.cpp deleted file mode 100644 index ef9c77fa77..0000000000 --- a/Libraries/LibAudio/AWavFile.cpp +++ /dev/null @@ -1,2 +0,0 @@ -#include "AWavFile.h" - diff --git a/Libraries/LibAudio/AWavFile.h b/Libraries/LibAudio/AWavFile.h deleted file mode 100644 index 6321179363..0000000000 --- a/Libraries/LibAudio/AWavFile.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include -#include - -class AWavFile : public RefCounted { -public: - enum class Format { - Invalid, - PCM, - }; - - Format format() const { return m_format; } - u16 channel_count() const { return m_channel_count; } - u32 sample_rate_per_second() const { return m_sample_rate; } - u32 average_byte_rate_per_second() const { return m_byte_rate; } - u16 block_align() const { return m_block_align; } - u16 bits_per_sample() const { return m_bits_per_sample; } - const ByteBuffer& sample_data() const { return m_sample_data; } - -private: - Format m_format = Format::Invalid; - u16 m_channel_count = 0; - u32 m_sample_rate = 0; - u32 m_byte_rate = 0; - u16 m_block_align = 0; - u16 m_bits_per_sample = 0; - ByteBuffer m_sample_data; - - friend class AWavLoader; -}; diff --git a/Libraries/LibAudio/AWavLoader.cpp b/Libraries/LibAudio/AWavLoader.cpp index ff1048f17b..1369d6fb1a 100644 --- a/Libraries/LibAudio/AWavLoader.cpp +++ b/Libraries/LibAudio/AWavLoader.cpp @@ -3,9 +3,9 @@ #include #include "AWavLoader.h" -#include "AWavFile.h" +#include "ABuffer.h" -RefPtr AWavLoader::load_wav(const StringView& path) +RefPtr AWavLoader::load_wav(const StringView& path) { m_error_string = {}; @@ -20,7 +20,7 @@ RefPtr AWavLoader::load_wav(const StringView& path) } // TODO: A streaming parser might be better than forcing a ByteBuffer -RefPtr AWavLoader::parse_wav(ByteBuffer& buffer) +RefPtr AWavLoader::parse_wav(ByteBuffer& buffer) { BufferStream stream(buffer); @@ -62,36 +62,30 @@ RefPtr AWavLoader::parse_wav(ByteBuffer& buffer) CHECK_OK("FMT size"); ASSERT(fmt_size == 16); - auto ret = adopt(*new AWavFile); u16 audio_format; stream >> audio_format; CHECK_OK("Audio format"); // incomplete read check ok = ok && audio_format == 1; // WAVE_FORMAT_PCM ASSERT(audio_format == 1); CHECK_OK("Audio format"); // value check - ret->m_format = AWavFile::Format::PCM; u16 num_channels; stream >> num_channels; + ok = ok && (num_channels == 1 || num_channels == 2); CHECK_OK("Channel count"); - ret->m_channel_count = num_channels; u32 sample_rate; stream >> sample_rate; CHECK_OK("Sample rate"); - ret->m_sample_rate = sample_rate; u32 byte_rate; stream >> byte_rate; CHECK_OK("Byte rate"); - ret->m_byte_rate = byte_rate; u16 block_align; stream >> block_align; CHECK_OK("Block align"); - ret->m_block_align = block_align; u16 bits_per_sample; stream >> bits_per_sample; CHECK_OK("Bits per sample"); // incomplete read check ok = ok && (bits_per_sample == 8 || bits_per_sample == 16); ASSERT(bits_per_sample == 8 || bits_per_sample == 16); CHECK_OK("Bits per sample"); // value check - ret->m_bits_per_sample = bits_per_sample; // Read chunks until we find DATA bool found_data = false; @@ -118,10 +112,110 @@ RefPtr AWavLoader::parse_wav(ByteBuffer& buffer) ok = ok && int(data_sz) <= (buffer.size() - stream.offset()); CHECK_OK("Bad DATA (truncated)"); - ret->m_sample_data = buffer.slice(stream.offset(), data_sz); - - // At this point there should be no read failures! + // Just make sure we're good before we read the data... ASSERT(!stream.handle_read_failure()); - return ret; + + auto sample_data = buffer.slice(stream.offset(), data_sz); + + dbgprintf("Read WAV of format PCM with num_channels %d sample rate %d, bits per sample %d\n", num_channels, sample_rate, bits_per_sample); + + return ABuffer::from_pcm_data(sample_data, num_channels, bits_per_sample, sample_rate); } +// Small helper to resample from one playback rate to another +// This isn't really "smart", in that we just insert (or drop) samples. +// Should do better... +class AResampleHelper { +public: + AResampleHelper(float source, float target); + bool read_sample(); + void prepare(); +private: + const float m_ratio; + float m_current_ratio { 0 }; +}; + +AResampleHelper::AResampleHelper(float source, float target) + : m_ratio(source / target) +{ +} + +void AResampleHelper::prepare() +{ + m_current_ratio += m_ratio; +} + +bool AResampleHelper::read_sample() +{ + if (m_current_ratio > 1) { + m_current_ratio--; + return true; + } + + return false; +} + +template +static void read_samples_from_stream(BufferStream& stream, Vector& samples, int num_channels, int source_rate) +{ + AResampleHelper resampler(source_rate, 44100); + T sample = 0; + float norm_l = 0; + float norm_r = 0; + switch (num_channels) { + case 1: + while (!stream.handle_read_failure()) { + resampler.prepare(); + while (resampler.read_sample()) { + stream >> sample; + norm_l = float(sample) / std::numeric_limits::max(); + } + samples.append(ASample(norm_l)); + } + break; + case 2: + while (!stream.handle_read_failure()) { + resampler.prepare(); + while (resampler.read_sample()) { + stream >> sample; + norm_l = float(sample) / std::numeric_limits::max(); + stream >> sample; + norm_r = float(sample) / std::numeric_limits::max(); + } + samples.append(ASample(norm_l, norm_r)); + } + break; + default: + ASSERT_NOT_REACHED(); + } +} + +// ### can't const this because BufferStream is non-const +// perhaps we need a reading class separate from the writing one, that can be +// entirely consted. +RefPtr ABuffer::from_pcm_data(ByteBuffer& data, int num_channels, int bits_per_sample, int source_rate) +{ + BufferStream stream(data); + Vector fdata; + fdata.ensure_capacity(data.size() * 2); + + dbg() << "Reading " << bits_per_sample << " bits and " << num_channels << " channels, total bytes: " << data.size(); + + switch (bits_per_sample) { + case 8: + read_samples_from_stream(stream, fdata, num_channels, source_rate); + break; + case 16: + read_samples_from_stream(stream, fdata, num_channels, source_rate); + break; + default: + ASSERT_NOT_REACHED(); + } + + // We should handle this in a better way above, but for now -- + // just make sure we're good. Worst case we just write some 0s where they + // don't belong. + ASSERT(!stream.handle_read_failure()); + + return adopt(*new ABuffer(fdata)); +} diff --git a/Libraries/LibAudio/AWavLoader.h b/Libraries/LibAudio/AWavLoader.h index 72dee9f347..9e0c7ad2f6 100644 --- a/Libraries/LibAudio/AWavLoader.h +++ b/Libraries/LibAudio/AWavLoader.h @@ -1,14 +1,18 @@ #pragma once #include +#include +#include -class AWavFile; +class ABuffer; +class ByteBuffer; +// Parses a WAV file and produces an ABuffer instance from it class AWavLoader { public: - RefPtr load_wav(const StringView& path); + RefPtr load_wav(const StringView& path); const char* error_string() { return m_error_string.characters(); } private: - RefPtr parse_wav(ByteBuffer& buffer); + RefPtr parse_wav(ByteBuffer& buffer); String m_error_string; }; diff --git a/Libraries/LibAudio/Makefile b/Libraries/LibAudio/Makefile index fa2c6507aa..33939c3fba 100644 --- a/Libraries/LibAudio/Makefile +++ b/Libraries/LibAudio/Makefile @@ -2,7 +2,6 @@ include ../../Makefile.common OBJS = \ AClientConnection.o \ - AWavFile.o \ AWavLoader.o LIBRARY = libaudio.a diff --git a/Servers/AudioServer/ASClientConnection.cpp b/Servers/AudioServer/ASClientConnection.cpp new file mode 100644 index 0000000000..1bd4a9e255 --- /dev/null +++ b/Servers/AudioServer/ASClientConnection.cpp @@ -0,0 +1,176 @@ +#include "ASClientConnection.h" +#include "ASMixer.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +ASClientConnection::ASClientConnection(int fd, int client_id, ASMixer& mixer) + : m_socket(fd) + , m_notifier(CNotifier(fd, CNotifier::Read)) + , m_client_id(client_id) + , m_mixer(mixer) +{ + m_notifier.on_ready_to_read = [this] { drain_client(); }; + ASAPI_ServerMessage message; + message.type = ASAPI_ServerMessage::Type::Greeting; + message.greeting.server_pid = getpid(); + message.greeting.your_client_id = m_client_id; + post_message(message); + dbg() << "********** S: Created new ASClientConnection " << fd << client_id << " and said hello"; +} + +ASClientConnection::~ASClientConnection() +{ + dbg() << "********** S: Destroyed ASClientConnection " << m_socket.fd() << m_client_id; +} + +void ASClientConnection::post_message(const ASAPI_ServerMessage& message, const ByteBuffer& extra_data) +{ + if (!extra_data.is_empty()) + const_cast(message).extra_size = extra_data.size(); + + struct iovec iov[2]; + int iov_count = 1; + + iov[0].iov_base = const_cast(&message); + iov[0].iov_len = sizeof(message); + + if (!extra_data.is_empty()) { + iov[1].iov_base = const_cast(extra_data.data()); + iov[1].iov_len = extra_data.size(); + ++iov_count; + } + + int nwritten = writev(m_socket.fd(), iov, iov_count); + if (nwritten < 0) { + switch (errno) { + case EPIPE: + dbgprintf("WSClientConnection::post_message: Disconnected from peer.\n"); + delete_later(); + return; + break; + case EAGAIN: + dbgprintf("WSClientConnection::post_message: Client buffer overflowed.\n"); + did_misbehave(); + return; + break; + default: + perror("WSClientConnection::post_message writev"); + ASSERT_NOT_REACHED(); + } + } + + ASSERT(nwritten == (int)(sizeof(message) + extra_data.size())); +} + +void ASClientConnection::event(CEvent& event) +{ + if (event.type() == ASEvent::WM_ClientDisconnected) { + int client_id = static_cast(event).client_id(); + dbgprintf("ASClientConnection: Client disconnected: %d\n", client_id); + delete this; + return; + } + + CObject::event(event); +} + +void ASClientConnection::drain_client() +{ + unsigned messages_received = 0; + for (;;) { + ASAPI_ClientMessage message; + // FIXME: Don't go one message at a time, that's so much context switching, oof. + ssize_t nread = recv(m_socket.fd(), &message, sizeof(ASAPI_ClientMessage), MSG_DONTWAIT); + if (nread == 0 || (nread == -1 && errno == EAGAIN)) { + if (!messages_received) { + // TODO: is delete_later() sufficient? + CEventLoop::current().post_event(*this, make(m_client_id)); + } + break; + } + if (nread < 0) { + perror("recv"); + ASSERT_NOT_REACHED(); + } + ByteBuffer extra_data; + if (message.extra_size) { + if (message.extra_size >= 32768) { + dbgprintf("message.extra_size is way too large\n"); + return did_misbehave(); + } + extra_data = ByteBuffer::create_uninitialized(message.extra_size); + // FIXME: We should allow this to time out. Maybe use a socket timeout? + int extra_nread = read(m_socket.fd(), extra_data.data(), extra_data.size()); + if (extra_nread != (int)message.extra_size) { + dbgprintf("extra_nread(%d) != extra_size(%d)\n", extra_nread, extra_data.size()); + if (extra_nread < 0) + perror("read"); + return did_misbehave(); + } + } + if (!handle_message(message, move(extra_data))) + return; + ++messages_received; + } +} + +bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, const ByteBuffer&) +{ + switch (message.type) { + case ASAPI_ClientMessage::Type::Greeting: + m_pid = message.greeting.client_pid; + break; + case ASAPI_ClientMessage::Type::PlayBuffer: { + // ### ensure that the size is that of a Vector + Vector samples; + + { + const auto& shared_buf = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id); + if (!shared_buf) { + did_misbehave(); + return false; + } + + if (shared_buf->size() / sizeof(ASample) > 441000) { + did_misbehave(); + return false; + } + samples.resize(shared_buf->size() / sizeof(ASample)); + memcpy(samples.data(), shared_buf->data(), shared_buf->size()); + } + + // we no longer need the buffer, so acknowledge that it's playing + // TODO: rate limit playback here somehow + ASAPI_ServerMessage reply; + reply.type = ASAPI_ServerMessage::Type::PlayingBuffer; + reply.playing_buffer.buffer_id = message.play_buffer.buffer_id; + post_message(reply); + + m_mixer.queue(*this, adopt(*new ABuffer(samples))); + break; + } + case ASAPI_ClientMessage::Type::Invalid: + default: + dbgprintf("ASClientConnection: Unexpected message ID %d\n", int(message.type)); + did_misbehave(); + } + + return true; +} + +void ASClientConnection::did_misbehave() +{ + dbgprintf("ASClientConnection{%p} (id=%d, pid=%d) misbehaved, disconnecting.\n", this, m_client_id, m_pid); + delete_later(); + m_notifier.set_enabled(false); +} diff --git a/Servers/AudioServer/ASClientConnection.h b/Servers/AudioServer/ASClientConnection.h new file mode 100644 index 0000000000..27f9cd3259 --- /dev/null +++ b/Servers/AudioServer/ASClientConnection.h @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include +#include + +struct ASAPI_ServerMessage; +struct ASAPI_ClientMessage; + +class ASEvent : public CEvent { +public: + enum Type { + Invalid = 2000, + WM_ClientDisconnected, + }; + ASEvent() {} + explicit ASEvent(Type type) + : CEvent(type) + { + } +}; + +class ASClientDisconnectedNotification : public ASEvent { +public: + explicit ASClientDisconnectedNotification(int client_id) + : ASEvent(WM_ClientDisconnected) + , m_client_id(client_id) + { + } + + int client_id() const { return m_client_id; } + +private: + int m_client_id { 0 }; +}; + +class ASMixer; + +class ASClientConnection : public CObject +{ +public: + ASClientConnection(int fd, int client_id, ASMixer& mixer); + ~ASClientConnection(); + + void post_message(const ASAPI_ServerMessage&, const ByteBuffer& = {}); + bool handle_message(const ASAPI_ClientMessage&, const ByteBuffer& = {}); + + void drain_client(); + + void did_misbehave(); + + const char* class_name() const override { return "ASClientConnection"; } + +protected: + void event(CEvent& event) override; +private: + // TODO: A way to create some kind of CIODevice with an open FD would be nice. + class ASOpenedSocket : public CIODevice + { + public: + const char* class_name() const override { return "ASOpenedSocket"; } + ASOpenedSocket(int fd) + { + set_fd(fd); + set_mode(CIODevice::OpenMode::ReadWrite); + } + + bool open(CIODevice::OpenMode) override + { + ASSERT_NOT_REACHED(); + return true; + }; + + int fd() const { return CIODevice::fd(); } + }; + + ASOpenedSocket m_socket; + CNotifier m_notifier; + int m_client_id; + int m_pid; + ASMixer& m_mixer; +}; + diff --git a/Servers/AudioServer/ASEventLoop.cpp b/Servers/AudioServer/ASEventLoop.cpp new file mode 100644 index 0000000000..d68916efd6 --- /dev/null +++ b/Servers/AudioServer/ASEventLoop.cpp @@ -0,0 +1,37 @@ +#include "ASEventLoop.h" +#include "ASClientConnection.h" + +#include +#include +#include + +ASEventLoop::ASEventLoop() +{ + unlink("/tmp/asportal"); + + sockaddr_un address; + address.sun_family = AF_LOCAL; + strcpy(address.sun_path, "/tmp/asportal"); + int rc = bind(m_server_sock.fd(), (const sockaddr*)&address, sizeof(address)); + ASSERT(rc == 0); + rc = listen(m_server_sock.fd(), 5); + ASSERT(rc == 0); + + m_server_notifier = make(m_server_sock.fd(), CNotifier::Read); + m_server_notifier->on_ready_to_read = [this] { drain_server(); }; +} + +void ASEventLoop::drain_server() +{ + sockaddr_un address; + socklen_t address_size = sizeof(address); + int client_fd = accept(m_server_sock.fd(), (sockaddr*)&address, &address_size); + if (client_fd < 0) { + dbgprintf("AudioServer: accept() failed: %s\n", strerror(errno)); + } else { + dbgprintf("AudioServer: accept()ed client %d\n", client_fd); + static int s_next_client_id = 0; + new ASClientConnection(client_fd, s_next_client_id++, m_mixer); + } +} + diff --git a/Servers/AudioServer/ASEventLoop.h b/Servers/AudioServer/ASEventLoop.h new file mode 100644 index 0000000000..c6a5ed259f --- /dev/null +++ b/Servers/AudioServer/ASEventLoop.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include +#include "ASMixer.h" + +class ASEventLoop +{ +public: + ASEventLoop(); + int exec() { return m_event_loop.exec(); } +private: + CEventLoop m_event_loop; + CLocalSocket m_server_sock; + OwnPtr m_server_notifier; + ASMixer m_mixer; + + void drain_server(); +}; diff --git a/Servers/AudioServer/ASMixer.cpp b/Servers/AudioServer/ASMixer.cpp new file mode 100644 index 0000000000..15a00dcb4d --- /dev/null +++ b/Servers/AudioServer/ASMixer.cpp @@ -0,0 +1,112 @@ +#include +#include + +#include +#include "ASMixer.h" + +ASMixer::ASMixer() + : m_device("/dev/audio") +{ + if (!m_device.open(CIODevice::WriteOnly)) { + dbgprintf("Can't open audio device: %s\n", m_device.error_string()); + return; + } + + CThread sound_thread([](void* context) -> int { + ASMixer* mixer = (ASMixer*)context; + mixer->mix(); + return 0; + }, this); +} + +void ASMixer::queue(ASClientConnection&, const ABuffer& buffer) +{ + ASSERT(buffer.size_in_bytes()); + CLocker lock(m_lock); + m_pending_mixing.append(ASMixerBuffer(buffer)); +} + +void ASMixer::mix() +{ + Vector active_mix_buffers; + + for (;;) { + { + CLocker lock(m_lock); + for (const auto& buf : m_pending_mixing) + active_mix_buffers.append(buf); + m_pending_mixing.clear(); + } + + // ### use a wakeup of some kind rather than this garbage + if (active_mix_buffers.size() == 0) { + // nothing to mix yet + usleep(10000); + continue; + } + + int max_size = 0; + + for (auto& buffer : active_mix_buffers) { + if (buffer.done) + continue; + ASSERT(buffer.buffer->size_in_bytes()); // zero sized buffer? how? + max_size = max(max_size, buffer.buffer->size_in_bytes() - buffer.pos); + } + + // ### clear up 'done' buffers more aggressively + if (max_size == 0) { + active_mix_buffers.clear(); + continue; + } + + max_size = min(1023, max_size); + + Vector mixed_buffer; + mixed_buffer.resize(max_size); + + // Mix the buffers together into the output + for (auto& buffer : active_mix_buffers) { + if (buffer.done) + continue; + auto& samples = buffer.buffer->samples(); + + for (int i = 0; i < max_size && buffer.pos < samples.size(); ++buffer.pos, ++i) { + auto& mixed_sample = mixed_buffer[i]; + mixed_sample += samples[buffer.pos]; + } + + // clear it later + if (buffer.pos == samples.size()) + buffer.done = true; + } + + // output the mixed stuff to the device + // max_size is 0 indexed, so add 1. + const int output_buffer_byte_size = (max_size + 1) * 2 * 2; + ASSERT(output_buffer_byte_size == 4096); + ByteBuffer buffer(ByteBuffer::create_uninitialized(output_buffer_byte_size)); + BufferStream stream(buffer); + + for (int i = 0; i < mixed_buffer.size(); ++i) { + auto& mixed_sample = mixed_buffer[i]; + mixed_sample.clamp(); + + i16 out_sample; + out_sample = mixed_sample.left * std::numeric_limits::max(); + stream << out_sample; + + ASSERT(!stream.at_end()); // we should have enough space for both channels in one buffer! + out_sample = mixed_sample.right * std::numeric_limits::max(); + stream << out_sample; + + ASSERT(!stream.at_end()); + } + + if (stream.offset() != 0) { + buffer.trim(stream.offset()); + m_device.write(buffer); + mixed_buffer.resize(0); + } + } +} diff --git a/Servers/AudioServer/ASMixer.h b/Servers/AudioServer/ASMixer.h new file mode 100644 index 0000000000..2cf45fd13f --- /dev/null +++ b/Servers/AudioServer/ASMixer.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +class ASClientConnection; + +class ASMixer : public RefCounted { +public: + ASMixer(); + + void queue(ASClientConnection&, const ABuffer&); + +private: + struct ASMixerBuffer { + ASMixerBuffer(const NonnullRefPtr& buf) + : buffer(buf) + {} + NonnullRefPtr buffer; + int pos { 0 }; + bool done { false }; + }; + + Vector m_pending_mixing; + CFile m_device; + CLock m_lock; + + void mix(); +}; diff --git a/Servers/AudioServer/Makefile b/Servers/AudioServer/Makefile index 74b7b9c22d..e813207d10 100644 --- a/Servers/AudioServer/Makefile +++ b/Servers/AudioServer/Makefile @@ -1,7 +1,10 @@ include ../../Makefile.common AUDIOSERVER_OBJS = \ - main.o + main.o \ + ASMixer.o \ + ASClientConnection.o \ + ASEventLoop.o APP = AudioServer OBJS = $(AUDIOSERVER_OBJS) @@ -11,7 +14,7 @@ DEFINES += -DUSERLAND all: $(APP) $(APP): $(OBJS) - $(LD) -o $(APP) $(LDFLAGS) $(OBJS) -lc -lcore -laudio + $(LD) -o $(APP) $(LDFLAGS) $(OBJS) -lc -lcore .cpp.o: @echo "CXX $<"; $(CXX) $(CXXFLAGS) -o $@ -c $< diff --git a/Servers/AudioServer/main.cpp b/Servers/AudioServer/main.cpp index 915aad33f5..ac7ef5297b 100644 --- a/Servers/AudioServer/main.cpp +++ b/Servers/AudioServer/main.cpp @@ -1,86 +1,6 @@ #include -#include -#include -#include -#include -#include -#include -#include -#include - -class ASEventLoop -{ -public: - ASEventLoop(); - int exec() { return m_event_loop.exec(); } -private: - CEventLoop m_event_loop; - CLocalSocket m_server_sock; - OwnPtr m_server_notifier; - - void drain_server(); -}; - -void read_and_play_wav() -{ - CFile audio("/dev/audio"); - if (!audio.open(CIODevice::WriteOnly)) { - dbgprintf("Can't open audio device: %s\n", audio.error_string()); - return; - } - - AWavLoader loader; - const auto& file = loader.load_wav("/home/anon/tmp.wav"); - if (!file) { - dbgprintf("Can't parse WAV: %s\n", loader.error_string()); - return; - } - - dbgprintf("Read WAV of format %d with num_channels %d sample rate %d, bits per sample %d\n", (u8)file->format(), file->channel_count(), file->sample_rate_per_second(), file->bits_per_sample()); - - auto contents = file->sample_data(); - const int chunk_size = 4096; - int i = 0; - while (i < contents.size()) { - const auto chunk = contents.slice(i, chunk_size); - audio.write(chunk); - i += chunk_size; - } -} - -ASEventLoop::ASEventLoop() -{ - read_and_play_wav(); - - unlink("/tmp/asportal"); - - sockaddr_un address; - address.sun_family = AF_LOCAL; - strcpy(address.sun_path, "/tmp/asportal"); - int rc = bind(m_server_sock.fd(), (const sockaddr*)&address, sizeof(address)); - ASSERT(rc == 0); - rc = listen(m_server_sock.fd(), 5); - ASSERT(rc == 0); - - m_server_notifier = make(m_server_sock.fd(), CNotifier::Read); - m_server_notifier->on_ready_to_read = [this] { drain_server(); }; -} - -void ASEventLoop::drain_server() -{ - sockaddr_un address; - socklen_t address_size = sizeof(address); - int client_fd = accept(m_server_sock.fd(), (sockaddr*)&address, &address_size); - if (client_fd < 0) { - dbgprintf("WindowServer: accept() failed: %s\n", strerror(errno)); - } else { - dbgprintf("AudioServer: accept()ed client %d\n", client_fd); - String s("hello, client!\n"); - write(client_fd, s.characters(), s.length()); - close(client_fd); - } -} +#include "ASEventLoop.h" int main(int, char**) { diff --git a/Userland/Makefile b/Userland/Makefile index 80c84a6af8..e56242e7e2 100644 --- a/Userland/Makefile +++ b/Userland/Makefile @@ -19,7 +19,7 @@ clean: $(APPS) : % : %.o $(OBJS) @echo "LD $@" - @$(LD) -o $@ $(LDFLAGS) $< -lc -lgui -lcore + @$(LD) -o $@ $(LDFLAGS) $< -lc -lgui -laudio -lcore %.o: %.cpp @echo "CXX $<" diff --git a/Userland/aplay.cpp b/Userland/aplay.cpp new file mode 100644 index 0000000000..e50fe16edc --- /dev/null +++ b/Userland/aplay.cpp @@ -0,0 +1,29 @@ +#include +#include +#include +#include +#include + +int main(int argc, char **argv) +{ + CEventLoop loop; + if (argc < 2) { + fprintf(stderr, "Need a WAV to play\n"); + return 1; + } + + printf("Establishing connection\n"); + AClientConnection a_conn; + printf("Established connection\n"); + AWavLoader loader; + const auto& buffer = loader.load_wav(argv[1]); + if (!buffer) { + dbgprintf("Can't parse WAV: %s\n", loader.error_string()); + return 1; + } + + printf("Playing WAV\n"); + a_conn.play(*buffer); + printf("Exiting! :)\n"); + return 0; +}