mirror of
https://github.com/RGBCube/serenity
synced 2025-05-31 22:18:12 +00:00
LibSQL: Redesign heap storage to support arbitrary amounts of data
Previously, `Heap` would store serialized data in blocks of 1024 bytes regardless of the actual length. Data longer than 1024 bytes was silently truncated causing database corruption. This changes the heap storage to prefix every block with two new fields: the total data size in bytes, and the next block to retrieve if the data is longer than what can be stored inside a single block. By chaining blocks together, we can store arbitrary amounts of data without needing to change anything of the logic in the rest of LibSQL. As part of these changes, the "free list" is also removed from the heap awaiting an actual implementation: it was never used. Note that this bumps the database version from 3 to 4, and as such invalidates (deletes) any database opened with LibSQL that is not version 4.
This commit is contained in:
parent
194f846f12
commit
6601ff9d65
13 changed files with 246 additions and 180 deletions
|
@ -1,5 +1,6 @@
|
|||
/*
|
||||
* Copyright (c) 2021, Jan de Visser <jan@de-visser.net>
|
||||
* Copyright (c) 2023, Jelle Raaijmakers <jelle@gmta.nl>
|
||||
*
|
||||
* SPDX-License-Identifier: BSD-2-Clause
|
||||
*/
|
||||
|
@ -42,8 +43,10 @@ ErrorOr<void> Heap::open()
|
|||
} else {
|
||||
file_size = stat_buffer.st_size;
|
||||
}
|
||||
if (file_size > 0)
|
||||
m_next_block = m_end_of_file = file_size / BLOCK_SIZE;
|
||||
if (file_size > 0) {
|
||||
m_next_block = file_size / Block::SIZE;
|
||||
m_highest_block_written = m_next_block - 1;
|
||||
}
|
||||
|
||||
auto file = TRY(Core::File::open(name(), Core::File::OpenMode::ReadWrite));
|
||||
m_file = TRY(Core::BufferedFile::create(move(file)));
|
||||
|
@ -54,7 +57,7 @@ ErrorOr<void> Heap::open()
|
|||
return error_maybe.release_error();
|
||||
}
|
||||
} else {
|
||||
initialize_zero_block();
|
||||
TRY(initialize_zero_block());
|
||||
}
|
||||
|
||||
// FIXME: We should more gracefully handle version incompatibilities. For now, we drop the database.
|
||||
|
@ -66,118 +69,137 @@ ErrorOr<void> Heap::open()
|
|||
return open();
|
||||
}
|
||||
|
||||
dbgln_if(SQL_DEBUG, "Heap file {} opened. Size = {}", name(), size());
|
||||
dbgln_if(SQL_DEBUG, "Heap file {} opened; number of blocks = {}", name(), m_highest_block_written);
|
||||
return {};
|
||||
}
|
||||
|
||||
ErrorOr<ByteBuffer> Heap::read_block(u32 block)
|
||||
bool Heap::has_block(Block::Index index) const
|
||||
{
|
||||
if (!m_file) {
|
||||
warnln("Heap({})::read_block({}): Heap file not opened"sv, name(), block);
|
||||
return Error::from_string_literal("Heap()::read_block(): Heap file not opened");
|
||||
return index <= m_highest_block_written || m_write_ahead_log.contains(index);
|
||||
}
|
||||
|
||||
ErrorOr<ByteBuffer> Heap::read_storage(Block::Index index)
|
||||
{
|
||||
dbgln_if(SQL_DEBUG, "{}({})", __FUNCTION__, index);
|
||||
|
||||
// Reconstruct the data storage from a potential chain of blocks
|
||||
ByteBuffer data;
|
||||
while (index > 0) {
|
||||
auto block = TRY(read_block(index));
|
||||
dbgln_if(SQL_DEBUG, " -> {} bytes", block.size_in_bytes());
|
||||
TRY(data.try_append(block.data().bytes().slice(0, block.size_in_bytes())));
|
||||
index = block.next_block();
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
if (auto buffer = m_write_ahead_log.get(block); buffer.has_value())
|
||||
return TRY(ByteBuffer::copy(*buffer));
|
||||
ErrorOr<void> Heap::write_storage(Block::Index index, ReadonlyBytes data)
|
||||
{
|
||||
dbgln_if(SQL_DEBUG, "{}({}, {} bytes)", __FUNCTION__, index, data.size());
|
||||
VERIFY(data.size() > 0);
|
||||
|
||||
if (block >= m_next_block) {
|
||||
warnln("Heap({})::read_block({}): block # out of range (>= {})"sv, name(), block, m_next_block);
|
||||
return Error::from_string_literal("Heap()::read_block(): block # out of range");
|
||||
// Split up the storage across multiple blocks if necessary, creating a chain
|
||||
u32 remaining_size = static_cast<u32>(data.size());
|
||||
u32 offset_in_data = 0;
|
||||
while (remaining_size > 0) {
|
||||
auto block_data_size = AK::min(remaining_size, Block::DATA_SIZE);
|
||||
remaining_size -= block_data_size;
|
||||
auto next_block_index = (remaining_size > 0) ? request_new_block_index() : 0;
|
||||
|
||||
auto block_data = TRY(ByteBuffer::create_uninitialized(block_data_size));
|
||||
block_data.bytes().overwrite(0, data.offset(offset_in_data), block_data_size);
|
||||
|
||||
TRY(write_block({ index, block_data_size, next_block_index, move(block_data) }));
|
||||
|
||||
index = next_block_index;
|
||||
offset_in_data += block_data_size;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
dbgln_if(SQL_DEBUG, "Read heap block {}", block);
|
||||
TRY(seek_block(block));
|
||||
ErrorOr<ByteBuffer> Heap::read_raw_block(Block::Index index)
|
||||
{
|
||||
VERIFY(m_file);
|
||||
VERIFY(index < m_next_block);
|
||||
|
||||
auto buffer = TRY(ByteBuffer::create_uninitialized(BLOCK_SIZE));
|
||||
if (auto data = m_write_ahead_log.get(index); data.has_value())
|
||||
return data.value();
|
||||
|
||||
TRY(m_file->seek(index * Block::SIZE, SeekMode::SetPosition));
|
||||
auto buffer = TRY(ByteBuffer::create_uninitialized(Block::SIZE));
|
||||
TRY(m_file->read_until_filled(buffer));
|
||||
|
||||
dbgln_if(SQL_DEBUG, "{:hex-dump}", buffer.bytes().trim(8));
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
ErrorOr<void> Heap::write_block(u32 block, ByteBuffer& buffer)
|
||||
ErrorOr<Block> Heap::read_block(Block::Index index)
|
||||
{
|
||||
if (!m_file) {
|
||||
warnln("Heap({})::write_block({}): Heap file not opened"sv, name(), block);
|
||||
return Error::from_string_literal("Heap()::write_block(): Heap file not opened");
|
||||
}
|
||||
if (block > m_next_block) {
|
||||
warnln("Heap({})::write_block({}): block # out of range (> {})"sv, name(), block, m_next_block);
|
||||
return Error::from_string_literal("Heap()::write_block(): block # out of range");
|
||||
}
|
||||
if (buffer.size() > BLOCK_SIZE) {
|
||||
warnln("Heap({})::write_block({}): Oversized block ({} > {})"sv, name(), block, buffer.size(), BLOCK_SIZE);
|
||||
return Error::from_string_literal("Heap()::write_block(): Oversized block");
|
||||
}
|
||||
dbgln_if(SQL_DEBUG, "Read heap block {}", index);
|
||||
|
||||
dbgln_if(SQL_DEBUG, "Write heap block {} size {}", block, buffer.size());
|
||||
TRY(seek_block(block));
|
||||
auto buffer = TRY(read_raw_block(index));
|
||||
auto size_in_bytes = *reinterpret_cast<u32*>(buffer.offset_pointer(0));
|
||||
auto next_block = *reinterpret_cast<Block::Index*>(buffer.offset_pointer(sizeof(u32)));
|
||||
auto data = TRY(buffer.slice(Block::HEADER_SIZE, Block::DATA_SIZE));
|
||||
|
||||
if (auto current_size = buffer.size(); current_size < BLOCK_SIZE) {
|
||||
TRY(buffer.try_resize(BLOCK_SIZE));
|
||||
memset(buffer.offset_pointer(current_size), 0, BLOCK_SIZE - current_size);
|
||||
}
|
||||
|
||||
dbgln_if(SQL_DEBUG, "{:hex-dump}", buffer.bytes().trim(8));
|
||||
TRY(m_file->write_until_depleted(buffer));
|
||||
|
||||
if (block == m_end_of_file)
|
||||
m_end_of_file++;
|
||||
return {};
|
||||
return Block { index, size_in_bytes, next_block, move(data) };
|
||||
}
|
||||
|
||||
ErrorOr<void> Heap::seek_block(u32 block)
|
||||
ErrorOr<void> Heap::write_raw_block(Block::Index index, ReadonlyBytes data)
|
||||
{
|
||||
if (!m_file) {
|
||||
warnln("Heap({})::seek_block({}): Heap file not opened"sv, name(), block);
|
||||
return Error::from_string_literal("Heap()::seek_block(): Heap file not opened");
|
||||
}
|
||||
if (block > m_end_of_file) {
|
||||
warnln("Heap({})::seek_block({}): Cannot seek beyond end of file at block {}"sv, name(), block, m_end_of_file);
|
||||
return Error::from_string_literal("Heap()::seek_block(): Cannot seek beyond end of file");
|
||||
}
|
||||
dbgln_if(SQL_DEBUG, "Write raw block {}", index);
|
||||
|
||||
if (block == m_end_of_file)
|
||||
TRY(m_file->seek(0, SeekMode::FromEndPosition));
|
||||
else
|
||||
TRY(m_file->seek(block * BLOCK_SIZE, SeekMode::SetPosition));
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
u32 Heap::new_record_pointer()
|
||||
{
|
||||
VERIFY(m_file);
|
||||
if (m_free_list) {
|
||||
auto block_or_error = read_block(m_free_list);
|
||||
if (block_or_error.is_error()) {
|
||||
warnln("FREE LIST CORRUPTION");
|
||||
VERIFY_NOT_REACHED();
|
||||
}
|
||||
auto new_pointer = m_free_list;
|
||||
memcpy(&m_free_list, block_or_error.value().offset_pointer(0), sizeof(u32));
|
||||
update_zero_block();
|
||||
return new_pointer;
|
||||
}
|
||||
return m_next_block++;
|
||||
VERIFY(data.size() == Block::SIZE);
|
||||
|
||||
TRY(m_file->seek(index * Block::SIZE, SeekMode::SetPosition));
|
||||
TRY(m_file->write_until_depleted(data));
|
||||
|
||||
if (index > m_highest_block_written)
|
||||
m_highest_block_written = index;
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
ErrorOr<void> Heap::write_raw_block_to_wal(Block::Index index, ByteBuffer&& data)
|
||||
{
|
||||
dbgln_if(SQL_DEBUG, "{}(): adding raw block {} to WAL", __FUNCTION__, index);
|
||||
VERIFY(index < m_next_block);
|
||||
VERIFY(data.size() == Block::SIZE);
|
||||
|
||||
TRY(m_write_ahead_log.try_set(index, move(data)));
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
ErrorOr<void> Heap::write_block(Block const& block)
|
||||
{
|
||||
VERIFY(block.index() < m_next_block);
|
||||
VERIFY(block.next_block() < m_next_block);
|
||||
VERIFY(block.data().size() <= Block::DATA_SIZE);
|
||||
|
||||
auto size_in_bytes = block.size_in_bytes();
|
||||
auto next_block = block.next_block();
|
||||
|
||||
auto heap_data = TRY(ByteBuffer::create_zeroed(Block::SIZE));
|
||||
heap_data.overwrite(0, &size_in_bytes, sizeof(size_in_bytes));
|
||||
heap_data.overwrite(sizeof(size_in_bytes), &next_block, sizeof(next_block));
|
||||
|
||||
block.data().bytes().copy_to(heap_data.bytes().slice(Block::HEADER_SIZE));
|
||||
|
||||
return write_raw_block_to_wal(block.index(), move(heap_data));
|
||||
}
|
||||
|
||||
ErrorOr<void> Heap::flush()
|
||||
{
|
||||
VERIFY(m_file);
|
||||
Vector<u32> blocks;
|
||||
for (auto& wal_entry : m_write_ahead_log)
|
||||
blocks.append(wal_entry.key);
|
||||
quick_sort(blocks);
|
||||
for (auto& block : blocks) {
|
||||
auto buffer_it = m_write_ahead_log.find(block);
|
||||
VERIFY(buffer_it != m_write_ahead_log.end());
|
||||
dbgln_if(SQL_DEBUG, "Flushing block {} to {}", block, name());
|
||||
TRY(write_block(block, buffer_it->value));
|
||||
auto indices = m_write_ahead_log.keys();
|
||||
quick_sort(indices);
|
||||
for (auto index : indices) {
|
||||
dbgln_if(SQL_DEBUG, "Flushing block {} to {}", index, name());
|
||||
auto& data = m_write_ahead_log.get(index).value();
|
||||
TRY(write_raw_block(index, data));
|
||||
}
|
||||
m_write_ahead_log.clear();
|
||||
dbgln_if(SQL_DEBUG, "WAL flushed. Heap size = {}", size());
|
||||
dbgln_if(SQL_DEBUG, "WAL flushed; new number of blocks = {}", m_highest_block_written);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -186,37 +208,33 @@ constexpr static auto VERSION_OFFSET = FILE_ID.length();
|
|||
constexpr static auto SCHEMAS_ROOT_OFFSET = VERSION_OFFSET + sizeof(u32);
|
||||
constexpr static auto TABLES_ROOT_OFFSET = SCHEMAS_ROOT_OFFSET + sizeof(u32);
|
||||
constexpr static auto TABLE_COLUMNS_ROOT_OFFSET = TABLES_ROOT_OFFSET + sizeof(u32);
|
||||
constexpr static auto FREE_LIST_OFFSET = TABLE_COLUMNS_ROOT_OFFSET + sizeof(u32);
|
||||
constexpr static auto USER_VALUES_OFFSET = FREE_LIST_OFFSET + sizeof(u32);
|
||||
constexpr static auto USER_VALUES_OFFSET = TABLE_COLUMNS_ROOT_OFFSET + sizeof(u32);
|
||||
|
||||
ErrorOr<void> Heap::read_zero_block()
|
||||
{
|
||||
auto buffer = TRY(read_block(0));
|
||||
auto file_id_buffer = TRY(buffer.slice(0, FILE_ID.length()));
|
||||
dbgln_if(SQL_DEBUG, "Read zero block from {}", name());
|
||||
|
||||
auto block = TRY(read_raw_block(0));
|
||||
auto file_id_buffer = TRY(block.slice(0, FILE_ID.length()));
|
||||
auto file_id = StringView(file_id_buffer);
|
||||
if (file_id != FILE_ID) {
|
||||
warnln("{}: Zero page corrupt. This is probably not a {} heap file"sv, name(), FILE_ID);
|
||||
return Error::from_string_literal("Heap()::read_zero_block(): Zero page corrupt. This is probably not a SerenitySQL heap file");
|
||||
}
|
||||
|
||||
dbgln_if(SQL_DEBUG, "Read zero block from {}", name());
|
||||
|
||||
memcpy(&m_version, buffer.offset_pointer(VERSION_OFFSET), sizeof(u32));
|
||||
memcpy(&m_version, block.offset_pointer(VERSION_OFFSET), sizeof(u32));
|
||||
dbgln_if(SQL_DEBUG, "Version: {}.{}", (m_version & 0xFFFF0000) >> 16, (m_version & 0x0000FFFF));
|
||||
|
||||
memcpy(&m_schemas_root, buffer.offset_pointer(SCHEMAS_ROOT_OFFSET), sizeof(u32));
|
||||
memcpy(&m_schemas_root, block.offset_pointer(SCHEMAS_ROOT_OFFSET), sizeof(u32));
|
||||
dbgln_if(SQL_DEBUG, "Schemas root node: {}", m_schemas_root);
|
||||
|
||||
memcpy(&m_tables_root, buffer.offset_pointer(TABLES_ROOT_OFFSET), sizeof(u32));
|
||||
memcpy(&m_tables_root, block.offset_pointer(TABLES_ROOT_OFFSET), sizeof(u32));
|
||||
dbgln_if(SQL_DEBUG, "Tables root node: {}", m_tables_root);
|
||||
|
||||
memcpy(&m_table_columns_root, buffer.offset_pointer(TABLE_COLUMNS_ROOT_OFFSET), sizeof(u32));
|
||||
memcpy(&m_table_columns_root, block.offset_pointer(TABLE_COLUMNS_ROOT_OFFSET), sizeof(u32));
|
||||
dbgln_if(SQL_DEBUG, "Table columns root node: {}", m_table_columns_root);
|
||||
|
||||
memcpy(&m_free_list, buffer.offset_pointer(FREE_LIST_OFFSET), sizeof(u32));
|
||||
dbgln_if(SQL_DEBUG, "Free list: {}", m_free_list);
|
||||
|
||||
memcpy(m_user_values.data(), buffer.offset_pointer(USER_VALUES_OFFSET), m_user_values.size() * sizeof(u32));
|
||||
memcpy(m_user_values.data(), block.offset_pointer(USER_VALUES_OFFSET), m_user_values.size() * sizeof(u32));
|
||||
for (auto ix = 0u; ix < m_user_values.size(); ix++) {
|
||||
if (m_user_values[ix])
|
||||
dbgln_if(SQL_DEBUG, "User value {}: {}", ix, m_user_values[ix]);
|
||||
|
@ -224,43 +242,40 @@ ErrorOr<void> Heap::read_zero_block()
|
|||
return {};
|
||||
}
|
||||
|
||||
void Heap::update_zero_block()
|
||||
ErrorOr<void> Heap::update_zero_block()
|
||||
{
|
||||
dbgln_if(SQL_DEBUG, "Write zero block to {}", name());
|
||||
dbgln_if(SQL_DEBUG, "Version: {}.{}", (m_version & 0xFFFF0000) >> 16, (m_version & 0x0000FFFF));
|
||||
dbgln_if(SQL_DEBUG, "Schemas root node: {}", m_schemas_root);
|
||||
dbgln_if(SQL_DEBUG, "Tables root node: {}", m_tables_root);
|
||||
dbgln_if(SQL_DEBUG, "Table Columns root node: {}", m_table_columns_root);
|
||||
dbgln_if(SQL_DEBUG, "Free list: {}", m_free_list);
|
||||
for (auto ix = 0u; ix < m_user_values.size(); ix++) {
|
||||
if (m_user_values[ix])
|
||||
if (m_user_values[ix] > 0)
|
||||
dbgln_if(SQL_DEBUG, "User value {}: {}", ix, m_user_values[ix]);
|
||||
}
|
||||
|
||||
// FIXME: Handle an OOM failure here.
|
||||
auto buffer = ByteBuffer::create_zeroed(BLOCK_SIZE).release_value_but_fixme_should_propagate_errors();
|
||||
buffer.overwrite(0, FILE_ID.characters_without_null_termination(), FILE_ID.length());
|
||||
buffer.overwrite(VERSION_OFFSET, &m_version, sizeof(u32));
|
||||
buffer.overwrite(SCHEMAS_ROOT_OFFSET, &m_schemas_root, sizeof(u32));
|
||||
buffer.overwrite(TABLES_ROOT_OFFSET, &m_tables_root, sizeof(u32));
|
||||
buffer.overwrite(TABLE_COLUMNS_ROOT_OFFSET, &m_table_columns_root, sizeof(u32));
|
||||
buffer.overwrite(FREE_LIST_OFFSET, &m_free_list, sizeof(u32));
|
||||
buffer.overwrite(USER_VALUES_OFFSET, m_user_values.data(), m_user_values.size() * sizeof(u32));
|
||||
auto buffer = TRY(ByteBuffer::create_zeroed(Block::SIZE));
|
||||
auto buffer_bytes = buffer.bytes();
|
||||
buffer_bytes.overwrite(0, FILE_ID.characters_without_null_termination(), FILE_ID.length());
|
||||
buffer_bytes.overwrite(VERSION_OFFSET, &m_version, sizeof(u32));
|
||||
buffer_bytes.overwrite(SCHEMAS_ROOT_OFFSET, &m_schemas_root, sizeof(u32));
|
||||
buffer_bytes.overwrite(TABLES_ROOT_OFFSET, &m_tables_root, sizeof(u32));
|
||||
buffer_bytes.overwrite(TABLE_COLUMNS_ROOT_OFFSET, &m_table_columns_root, sizeof(u32));
|
||||
buffer_bytes.overwrite(USER_VALUES_OFFSET, m_user_values.data(), m_user_values.size() * sizeof(u32));
|
||||
|
||||
add_to_wal(0, buffer);
|
||||
return write_raw_block_to_wal(0, move(buffer));
|
||||
}
|
||||
|
||||
void Heap::initialize_zero_block()
|
||||
ErrorOr<void> Heap::initialize_zero_block()
|
||||
{
|
||||
m_version = VERSION;
|
||||
m_schemas_root = 0;
|
||||
m_tables_root = 0;
|
||||
m_table_columns_root = 0;
|
||||
m_next_block = 1;
|
||||
m_free_list = 0;
|
||||
for (auto& user : m_user_values)
|
||||
user = 0u;
|
||||
update_zero_block();
|
||||
return update_zero_block();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue