1
Fork 0
mirror of https://github.com/RGBCube/serenity synced 2025-05-16 19:35:08 +00:00

LibSQL: Introduce Serializer as a mediator between Heap and client code

Classes reading and writing to the data heap would communicate directly
with the Heap object, and transfer ByteBuffers back and forth with it.
This makes things like caching and locking hard. Therefore all data
persistence activity will be funneled through a Serializer object which
in turn submits it to the Heap.

Introducing this unfortunately resulted in a huge amount of churn, in
which a number of smaller refactorings got caught up as well.
This commit is contained in:
Jan de Visser 2021-08-18 20:50:13 -04:00 committed by Andreas Kling
parent 9e43508d30
commit 85a84b0794
30 changed files with 995 additions and 780 deletions

View file

@ -7,7 +7,7 @@
#include <LibSQL/HashIndex.h>
#include <LibSQL/Heap.h>
#include <LibSQL/Key.h>
#include <LibSQL/Serialize.h>
#include <LibSQL/Serializer.h>
namespace SQL {
@ -19,18 +19,19 @@ HashDirectoryNode::HashDirectoryNode(HashIndex& index, u32 node_number, size_t o
{
}
HashDirectoryNode::HashDirectoryNode(HashIndex& index, u32 pointer, ByteBuffer& buffer)
HashDirectoryNode::HashDirectoryNode(HashIndex& index, u32 pointer)
: IndexNode(pointer)
, m_hash_index(index)
{
}
void HashDirectoryNode::deserialize(Serializer& serializer)
{
dbgln_if(SQL_DEBUG, "Deserializing Hash Directory Node");
size_t offset = 0;
deserialize_from<u32>(buffer, offset, index.m_global_depth);
u32 size;
deserialize_from<u32>(buffer, offset, size);
dbgln_if(SQL_DEBUG, "Global Depth {}, #Bucket pointers {}", index.global_depth(), size);
u32 next_node;
deserialize_from<u32>(buffer, offset, next_node);
m_hash_index.m_global_depth = serializer.deserialize<u32>();
auto size = serializer.deserialize<u32>();
dbgln_if(SQL_DEBUG, "Global Depth {}, #Bucket pointers {}", m_hash_index.global_depth(), size);
auto next_node = serializer.deserialize<u32>();
if (next_node) {
dbgln_if(SQL_DEBUG, "Next node {}", next_node);
m_hash_index.m_nodes.append(next_node);
@ -39,20 +40,18 @@ HashDirectoryNode::HashDirectoryNode(HashIndex& index, u32 pointer, ByteBuffer&
m_is_last = true;
}
for (auto ix = 0u; ix < size; ix++) {
u32 bucket_pointer;
deserialize_from(buffer, offset, bucket_pointer);
u32 local_depth;
deserialize_from(buffer, offset, local_depth);
dbgln_if(SQL_DEBUG, "Bucket pointer {} local depth {}", bucket_pointer, local_depth);
index.append_bucket(ix, local_depth, bucket_pointer);
auto bucket_pointer = serializer.deserialize<u32>();
auto local_depth = serializer.deserialize<u32>();
dbgln_if(SQL_DEBUG, "--Index {} bucket pointer {} local depth {}", ix, bucket_pointer, local_depth);
m_hash_index.append_bucket(ix, local_depth, bucket_pointer);
}
}
void HashDirectoryNode::serialize(ByteBuffer& buffer) const
void HashDirectoryNode::serialize(Serializer& serializer) const
{
dbgln_if(SQL_DEBUG, "Serializing directory node #{}. Offset {}", m_node_number, m_offset);
serialize_to(buffer, m_hash_index.global_depth());
serialize_to(buffer, number_of_pointers());
serializer.serialize<u32>((u32)m_hash_index.global_depth());
serializer.serialize<u32>(number_of_pointers());
dbgln_if(SQL_DEBUG, "Global depth {}, #bucket pointers {}", m_hash_index.global_depth(), number_of_pointers());
u32 next_node;
@ -64,12 +63,12 @@ void HashDirectoryNode::serialize(ByteBuffer& buffer) const
dbgln_if(SQL_DEBUG, "This is the last directory node");
}
serialize_to(buffer, next_node);
serializer.serialize<u32>(next_node);
for (auto ix = 0u; ix < number_of_pointers(); ix++) {
auto& bucket = m_hash_index.m_buckets[m_offset + ix];
dbgln_if(SQL_DEBUG, "Bucket pointer {} local depth {}", bucket->pointer(), bucket->local_depth());
serialize_to(buffer, bucket->pointer());
serialize_to(buffer, bucket->local_depth());
dbgln_if(SQL_DEBUG, "Bucket index #{} pointer {} local depth {} size {}", ix, bucket->pointer(), bucket->local_depth(), bucket->size());
serializer.serialize<u32>(bucket->pointer());
serializer.serialize<u32>(bucket->local_depth());
}
}
@ -81,44 +80,41 @@ HashBucket::HashBucket(HashIndex& hash_index, u32 index, u32 local_depth, u32 po
{
}
void HashBucket::serialize(ByteBuffer& buffer) const
void HashBucket::serialize(Serializer& serializer) const
{
dbgln_if(SQL_DEBUG, "Serializing bucket: pointer {}, index #{}, local depth {} size {}",
pointer(), index(), local_depth(), size());
dbgln_if(SQL_DEBUG, "key_length: {} max_entries: {}", m_hash_index.descriptor()->data_length(), max_entries_in_bucket());
serialize_to(buffer, local_depth());
serialize_to(buffer, size());
dbgln_if(SQL_DEBUG, "buffer size after prolog {}", buffer.size());
serializer.serialize<u32>(local_depth());
serializer.serialize<u32>(size());
for (auto& key : m_entries) {
key.serialize(buffer);
dbgln_if(SQL_DEBUG, "Key {} buffer size {}", key.to_string(), buffer.size());
serializer.serialize<Key>(key);
}
}
void HashBucket::inflate()
void HashBucket::deserialize(Serializer& serializer)
{
if (m_inflated || !pointer())
return;
dbgln_if(SQL_DEBUG, "Inflating Hash Bucket {}", pointer());
auto buffer = m_hash_index.read_block(pointer());
size_t offset = 0;
deserialize_from(buffer, offset, m_local_depth);
m_local_depth = serializer.deserialize<u32>();
dbgln_if(SQL_DEBUG, "Bucket Local Depth {}", m_local_depth);
u32 size;
deserialize_from(buffer, offset, size);
auto size = serializer.deserialize<u32>();
dbgln_if(SQL_DEBUG, "Bucket has {} keys", size);
for (auto ix = 0u; ix < size; ix++) {
Key key(m_hash_index.descriptor(), buffer, offset);
auto key = serializer.deserialize<Key>(m_hash_index.descriptor());
dbgln_if(SQL_DEBUG, "Key {}: {}", ix, key.to_string());
m_entries.append(key);
}
m_inflated = true;
}
size_t HashBucket::max_entries_in_bucket() const
size_t HashBucket::length() const
{
auto key_size = m_hash_index.descriptor()->data_length() + sizeof(u32);
return (BLOCKSIZE - 2 * sizeof(u32)) / key_size;
size_t len = 2 * sizeof(u32);
for (auto& key : m_entries) {
len += key.length();
}
return len;
}
Optional<u32> HashBucket::get(Key& key)
@ -134,15 +130,17 @@ Optional<u32> HashBucket::get(Key& key)
bool HashBucket::insert(Key const& key)
{
inflate();
if (!m_inflated)
m_hash_index.serializer().deserialize_block_to(pointer(), *this);
if (find_key_in_bucket(key).has_value()) {
return false;
}
if (size() >= max_entries_in_bucket()) {
if ((length() + key.length()) > BLOCKSIZE) {
dbgln_if(SQL_DEBUG, "Adding key {} would make length exceed block size", key.to_string());
return false;
}
m_entries.append(key);
m_hash_index.add_to_write_ahead_log(this);
m_hash_index.serializer().serialize_and_write(*this, pointer());
return true;
}
@ -161,7 +159,7 @@ HashBucket const* HashBucket::next_bucket()
{
for (auto ix = m_index + 1; ix < m_hash_index.size(); ix++) {
auto bucket = m_hash_index.get_bucket_by_index(ix);
bucket->inflate();
m_hash_index.serializer().deserialize_block_to<HashBucket>(bucket->pointer(), *bucket);
if (bucket->size())
return bucket;
}
@ -178,13 +176,27 @@ HashBucket const* HashBucket::previous_bucket()
return nullptr;
}
Vector<Key> const& HashBucket::entries()
{
if (!m_inflated)
m_hash_index.serializer().deserialize_block_to(pointer(), *this);
return m_entries;
}
Key const& HashBucket::operator[](size_t ix)
{
inflate();
if (!m_inflated)
m_hash_index.serializer().deserialize_block_to(pointer(), *this);
VERIFY(ix < size());
return m_entries[ix];
}
Key const& HashBucket::operator[](size_t ix) const
{
VERIFY(ix < m_entries.size());
return m_entries[ix];
}
void HashBucket::list_bucket()
{
warnln("Bucket #{} size {} local depth {} pointer {}{}",
@ -194,20 +206,19 @@ void HashBucket::list_bucket()
}
}
HashIndex::HashIndex(Heap& heap, NonnullRefPtr<TupleDescriptor> const& descriptor, u32 first_node)
: Index(heap, descriptor, true, first_node)
HashIndex::HashIndex(Serializer& serializer, NonnullRefPtr<TupleDescriptor> const& descriptor, u32 first_node)
: Index(serializer, descriptor, true, first_node)
, m_nodes()
, m_buckets()
{
if (!first_node) {
set_pointer(new_record_pointer());
}
if (this->heap().has_block(first_node)) {
if (serializer.has_block(first_node)) {
u32 pointer = first_node;
do {
VERIFY(this->heap().has_block(pointer));
auto buffer = read_block(pointer);
auto node = HashDirectoryNode(*this, pointer, buffer);
VERIFY(serializer.has_block(pointer));
auto node = serializer.deserialize_block<HashDirectoryNode>(pointer, *this, pointer);
if (node.is_last())
break;
pointer = m_nodes.last(); // FIXME Ugly
@ -215,10 +226,10 @@ HashIndex::HashIndex(Heap& heap, NonnullRefPtr<TupleDescriptor> const& descripto
} else {
auto bucket = append_bucket(0u, 1u, new_record_pointer());
bucket->m_inflated = true;
add_to_write_ahead_log(bucket);
serializer.serialize_and_write(*bucket, bucket->pointer());
bucket = append_bucket(1u, 1u, new_record_pointer());
bucket->m_inflated = true;
add_to_write_ahead_log(bucket);
serializer.serialize_and_write(*bucket, bucket->pointer());
m_nodes.append(first_node);
write_directory_to_write_ahead_log();
}
@ -242,10 +253,12 @@ HashBucket* HashIndex::get_bucket_for_insert(Key const& key)
auto key_hash = key.hash();
do {
dbgln_if(SQL_DEBUG, "HashIndex::get_bucket_for_insert({}) bucket {} of {}", key.to_string(), key_hash % size(), size());
auto bucket = get_bucket(key_hash % size());
if (bucket->size() < bucket->max_entries_in_bucket()) {
if (bucket->length() + key.length() < BLOCKSIZE) {
return bucket;
}
dbgln_if(SQL_DEBUG, "Bucket is full (bucket size {}/length {} key length {}). Expanding directory", bucket->size(), bucket->length(), key.length());
// We previously doubled the directory but the target bucket is
// still at an older depth. Create new buckets at the current global
@ -254,36 +267,47 @@ HashBucket* HashIndex::get_bucket_for_insert(Key const& key)
while (bucket->local_depth() < global_depth()) {
auto base_index = bucket->index();
auto step = 1 << (global_depth() - bucket->local_depth());
auto total_moved = 0;
for (auto ix = base_index + step; ix < size(); ix += step) {
auto& sub_bucket = m_buckets[ix];
sub_bucket->set_local_depth(bucket->local_depth() + 1);
auto moved = 0;
for (auto entry_index = (int)bucket->m_entries.size() - 1; entry_index >= 0; entry_index--) {
if (bucket->m_entries[entry_index].hash() % size() == ix) {
if (!sub_bucket->pointer()) {
sub_bucket->set_pointer(new_record_pointer());
}
sub_bucket->insert(bucket->m_entries.take(entry_index));
moved++;
}
}
if (m_buckets[ix]->pointer())
add_to_write_ahead_log(m_buckets[ix]);
if (moved > 0) {
dbgln_if(SQL_DEBUG, "Moved {} entries from bucket #{} to #{}", moved, base_index, ix);
serializer().serialize_and_write(*sub_bucket, sub_bucket->pointer());
}
total_moved += moved;
}
if (total_moved)
dbgln_if(SQL_DEBUG, "Redistributed {} entries from bucket #{}", total_moved, base_index);
else
dbgln_if(SQL_DEBUG, "Nothing redistributed from bucket #{}", base_index);
bucket->set_local_depth(bucket->local_depth() + 1);
add_to_write_ahead_log(bucket);
serializer().serialize_and_write(*bucket, bucket->pointer());
write_directory_to_write_ahead_log();
auto bucket_after_redistribution = get_bucket(key_hash % size());
if (bucket_after_redistribution->size() < bucket_after_redistribution->max_entries_in_bucket()) {
if (bucket_after_redistribution->length() + key.length() < BLOCKSIZE)
return bucket_after_redistribution;
}
}
expand();
} while (true);
VERIFY_NOT_REACHED();
}
void HashIndex::expand()
{
auto sz = size();
dbgln_if(SQL_DEBUG, "Expanding directory from {} to {} buckets", sz, 2 * sz);
for (auto i = 0u; i < sz; i++) {
auto bucket = get_bucket(i);
bucket = append_bucket(sz + i, bucket->local_depth(), 0u);
@ -303,7 +327,7 @@ void HashIndex::write_directory_to_write_ahead_log()
size_t num_node = 0u;
while (offset < size()) {
HashDirectoryNode node(*this, num_node, offset);
add_to_write_ahead_log(node.as_index_node());
serializer().serialize_and_write(node, node.pointer());
offset += node.number_of_pointers();
}
}
@ -325,14 +349,20 @@ Optional<u32> HashIndex::get(Key& key)
{
auto hash = key.hash();
auto bucket_index = hash % size();
dbgln_if(SQL_DEBUG, "HashIndex::get({}) bucket_index {}", key.to_string(), bucket_index);
auto bucket = get_bucket(bucket_index);
if constexpr (SQL_DEBUG)
bucket->list_bucket();
return bucket->get(key);
}
bool HashIndex::insert(Key const& key)
{
dbgln_if(SQL_DEBUG, "HashIndex::insert({})", key.to_string());
auto bucket = get_bucket_for_insert(key);
bucket->insert(key);
if constexpr (SQL_DEBUG)
bucket->list_bucket();
return true;
}
@ -369,7 +399,6 @@ void HashIndex::list_hash()
bool first_bucket = true;
for (auto& bucket : m_buckets) {
if (first_bucket) {
warnln("Max. keys in bucket {}", bucket->max_entries_in_bucket());
first_bucket = false;
}
bucket->list_bucket();