1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-07-26 01:27:43 +00:00

CHttpJob: Drive response download via on_ready_read instead of blocking

This allows the event loop to service events while an HTTP download is
happening. Pretty cool :^)
This commit is contained in:
Andreas Kling 2019-08-04 18:55:52 +02:00
parent 72b69b82bb
commit b5aac9c44b
2 changed files with 25 additions and 28 deletions

View file

@ -24,14 +24,10 @@ void CHttpJob::on_socket_connected()
if (!success) if (!success)
return deferred_invoke([this](auto&) { did_fail(CNetworkJob::Error::TransmissionFailed); }); return deferred_invoke([this](auto&) { did_fail(CNetworkJob::Error::TransmissionFailed); });
Vector<ByteBuffer> received_buffers; m_socket->on_ready_to_read = [&] {
size_t received_size = 0;
while (m_socket->is_connected()) {
if (m_state == State::InStatus) { if (m_state == State::InStatus) {
while (!m_socket->can_read_line()) if (!m_socket->can_read_line())
usleep(1); return;
ASSERT(m_socket->can_read_line());
auto line = m_socket->read_line(PAGE_SIZE); auto line = m_socket->read_line(PAGE_SIZE);
if (line.is_null()) { if (line.is_null()) {
printf("Expected HTTP status\n"); printf("Expected HTTP status\n");
@ -49,11 +45,11 @@ void CHttpJob::on_socket_connected()
return deferred_invoke([this](auto&) { did_fail(CNetworkJob::Error::ProtocolFailed); }); return deferred_invoke([this](auto&) { did_fail(CNetworkJob::Error::ProtocolFailed); });
} }
m_state = State::InHeaders; m_state = State::InHeaders;
continue; return;
} }
if (m_state == State::InHeaders) { if (m_state == State::InHeaders) {
while (!m_socket->can_read_line()) if (!m_socket->can_read_line())
usleep(1); return;
auto line = m_socket->read_line(PAGE_SIZE); auto line = m_socket->read_line(PAGE_SIZE);
if (line.is_null()) { if (line.is_null()) {
printf("Expected HTTP header\n"); printf("Expected HTTP header\n");
@ -62,7 +58,7 @@ void CHttpJob::on_socket_connected()
auto chomped_line = String::copy(line, Chomp); auto chomped_line = String::copy(line, Chomp);
if (chomped_line.is_empty()) { if (chomped_line.is_empty()) {
m_state = State::InBody; m_state = State::InBody;
continue; return;
} }
auto parts = chomped_line.split(':'); auto parts = chomped_line.split(':');
if (parts.is_empty()) { if (parts.is_empty()) {
@ -77,40 +73,38 @@ void CHttpJob::on_socket_connected()
auto value = chomped_line.substring(name.length() + 2, chomped_line.length() - name.length() - 2); auto value = chomped_line.substring(name.length() + 2, chomped_line.length() - name.length() - 2);
m_headers.set(name, value); m_headers.set(name, value);
printf("[%s] = '%s'\n", name.characters(), value.characters()); printf("[%s] = '%s'\n", name.characters(), value.characters());
continue; return;
} }
ASSERT(m_state == State::InBody); ASSERT(m_state == State::InBody);
while (!m_socket->can_read())
usleep(1);
ASSERT(m_socket->can_read()); ASSERT(m_socket->can_read());
auto payload = m_socket->receive(PAGE_SIZE); auto payload = m_socket->receive(PAGE_SIZE);
if (!payload) { if (!payload) {
if (m_socket->eof()) { if (m_socket->eof())
m_state = State::Finished; return finish_up();
break;
}
return deferred_invoke([this](auto&) { did_fail(CNetworkJob::Error::ProtocolFailed); }); return deferred_invoke([this](auto&) { did_fail(CNetworkJob::Error::ProtocolFailed); });
} }
received_buffers.append(payload); m_received_buffers.append(payload);
received_size += payload.size(); m_received_size += payload.size();
auto content_length_header = m_headers.get("Content-Length"); auto content_length_header = m_headers.get("Content-Length");
if (content_length_header.has_value()) { if (content_length_header.has_value()) {
bool ok; bool ok;
if (received_size >= content_length_header.value().to_uint(ok) && ok) { if (m_received_size >= content_length_header.value().to_uint(ok) && ok)
m_state = State::Finished; return finish_up();
break;
}
} }
} };
}
auto flattened_buffer = ByteBuffer::create_uninitialized(received_size); void CHttpJob::finish_up()
{
m_state = State::Finished;
auto flattened_buffer = ByteBuffer::create_uninitialized(m_received_size);
u8* flat_ptr = flattened_buffer.data(); u8* flat_ptr = flattened_buffer.data();
for (auto& received_buffer : received_buffers) { for (auto& received_buffer : m_received_buffers) {
memcpy(flat_ptr, received_buffer.data(), received_buffer.size()); memcpy(flat_ptr, received_buffer.data(), received_buffer.size());
flat_ptr += received_buffer.size(); flat_ptr += received_buffer.size();
} }
received_buffers.clear(); m_received_buffers.clear();
auto response = CHttpResponse::create(m_code, move(m_headers), move(flattened_buffer)); auto response = CHttpResponse::create(m_code, move(m_headers), move(flattened_buffer));
deferred_invoke([this, response](auto&) { deferred_invoke([this, response](auto&) {

View file

@ -16,6 +16,7 @@ public:
private: private:
void on_socket_connected(); void on_socket_connected();
void finish_up();
enum class State { enum class State {
InStatus, InStatus,
@ -29,4 +30,6 @@ private:
State m_state { State::InStatus }; State m_state { State::InStatus };
int m_code { -1 }; int m_code { -1 };
HashMap<String, String> m_headers; HashMap<String, String> m_headers;
Vector<ByteBuffer> m_received_buffers;
size_t m_received_size { 0 };
}; };