Task 2.A - B+TREE DATA STRUCTURE (INSERTION & POINT SEARCH) 实现b_plus_tree.cpp/InsertIntoLeaf函数所涉及到的相关函数。 您的B +树索引只能支持唯一键。 也就是说,当您尝试将具有重复键的键值对插入索引时,它应该返回false
对于checkpoint1,仅需要B + Tree索引支持插入(Insert
)和点搜索(GetValue
)。 您不需要实现删除操作。 插入后如果当前键/值对的数量等于max_size
,则应该正确执行分割。 由于任何写操作都可能导致B + Tree索引中的root_page_id
发生更改,因此您有责任更新(src / include / storage / page / header_page.h)中的root_page_id
,以确保索引在磁盘上具有持久性 。 在BPlusTree
类中,我们已经为您实现了一个名为UpdateRootPageId
的函数。 您需要做的就是在B + Tree索引的root_page_id
更改时调用此函数。
您的B + Tree实现必须隐藏key/value等的详细信息,建议使用如下结构:
1 2 3 4 5 6 template <typename KeyType, typename ValueType, typename KeyComparator> class BPlusTree { };
这些类别已经为你实现了
KeyType
: The type of each key in the index. This will only be GenericKey, the actual size of GenericKey is specified and instantiated with a template argument and depends on the data type of indexed attribute.
ValueType
: The type of each value in the index. This will only be 64-bit RID.
KeyComparator
: The class used to compare whether two KeyType instances are less/greater-than each other. These will be included in the KeyType implementation files.
你必须使用传入的transaction
,把已经加锁的页面保存起来。 我们提供了读写锁存器的实现(src / include / common / rwlatch.h)。 并且已经在页面头文件下添加了辅助函数来获取和释放Latch
锁(src / include / storage / page / page.h)。 GetValue相关函数 GetValue() 先找到leaf page,这里面会调用FindLeafPageByOperation
函数。 在leaf page里找这个key,调用叶子结点的查找函数返回value
。 同时需要考虑并发,这里先讨论查找操作并发。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 INDEX_TEMPLATE_ARGUMENTS bool BPLUSTREE_TYPE::GetValue (const KeyType &key, std::vector<ValueType> *result, Transaction *transaction) { Page *leaf_page = FindLeafPageByOperation (key, Operation::FIND, transaction).first; LeafPage *leaf_node = reinterpret_cast <LeafPage *>(leaf_page->GetData ()); ValueType value{}; bool is_exist = leaf_node->Lookup (key, &value, comparator_); leaf_page->RUnlatch (); buffer_pool_manager_->UnpinPage (leaf_page->GetPageId (), false ); if (!is_exist) { return false ; } result->push_back (value); return true ; }
FindLeafPageByOperation() 对于查找操作,我们按照课堂上讲的规则,一步步加锁/解锁。 查找操作运用的是读锁RLatch
, 给下一层添加时立即解锁上一层。 我们从根节点遍历到叶子节点,返回已锁的目标page。 root_latch_
是为了维护root id不变。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 INDEX_TEMPLATE_ARGUMENTS std::pair<Page *, bool > BPLUSTREE_TYPE::FindLeafPageByOperation (const KeyType &key, Operation operation, Transaction *transaction, bool leftMost, bool rightMost) { assert (operation == Operation::FIND ? !(leftMost && rightMost) : transaction != nullptr ); root_latch_.lock (); bool is_root_page_id_latched = true ; Page *page = buffer_pool_manager_->FetchPage (root_page_id_); BPlusTreePage *node = reinterpret_cast <BPlusTreePage *>(page->GetData ()); if (operation == Operation::FIND) { page->RLatch (); is_root_page_id_latched = false ; root_latch_.unlock (); } else { page->WLatch (); if (IsSafe (node, operation)) { is_root_page_id_latched = false ; root_latch_.unlock (); } } while (!node->IsLeafPage ()) { InternalPage *i_node = reinterpret_cast <InternalPage *>(node); page_id_t child_node_page_id; if (leftMost) { child_node_page_id = i_node->ValueAt (0 ); } else if (rightMost) { child_node_page_id = i_node->ValueAt (i_node->GetSize () - 1 ); } else { child_node_page_id = i_node->Lookup (key, comparator_); } auto child_page = buffer_pool_manager_->FetchPage (child_node_page_id); auto child_node = reinterpret_cast <BPlusTreePage *>(child_page->GetData ()); if (operation == Operation::FIND) { child_page->RLatch (); page->RUnlatch (); buffer_pool_manager_->UnpinPage (page->GetPageId (), false ); } else { child_page->WLatch (); transaction->AddIntoPageSet (page); if (IsSafe (child_node, operation)) { if (is_root_page_id_latched) { is_root_page_id_latched = false ; root_latch_.unlock (); } UnlockUnpinPages (transaction); } } page = child_page; node = child_node; } return std::make_pair (page, is_root_page_id_latched); }
StartNewTree函数 创建一个新的根节点 先去缓冲池申请一个new page,作为root page(该函数会分配一个page id) page id赋值给root page id,并插入header page的root page id 使用leaf page的Insert函数插入(key,value) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 INDEX_TEMPLATE_ARGUMENTS void BPLUSTREE_TYPE::StartNewTree (const KeyType &key, const ValueType &value) { page_id_t new_page_id = INVALID_PAGE_ID; Page *root_page = buffer_pool_manager_->NewPage (&new_page_id); if (nullptr == root_page) { throw std::runtime_error ("out of memory" ); } root_page_id_ = new_page_id; UpdateRootPageId (1 ); LeafPage *root_node = reinterpret_cast <LeafPage *>(root_page->GetData ()); root_node->Init (new_page_id, INVALID_PAGE_ID, leaf_max_size_); root_node->Insert (key, value, comparator_); buffer_pool_manager_->UnpinPage (root_page->GetPageId (), true ); }
Insert函数 先判断插入时树是否为空,如果是空,建立一个新树,再插入。 注意新建根节点时要加锁。 1 2 3 4 5 6 7 8 9 10 11 INDEX_TEMPLATE_ARGUMENTS bool BPLUSTREE_TYPE::Insert (const KeyType &key, const ValueType &value, Transaction *transaction) { { const std::lock_guard<std::mutex> guard (root_latch_) ; if (IsEmpty ()) { StartNewTree (key, value); return true ; } } return InsertIntoLeaf (key, value, transaction); }
InsertIntoLeaf相关函数 FindLeafPageByOperation() 这里会用到Insert
命令和写锁。 从根节点开始由上至下逐步加入WLatch
。 每一层都判断一次当前节点是否安全,即满足node->GetSize() < node->GetMaxSize() - 1
。 若安全,解锁并unpin所有经过的父节点。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 INDEX_TEMPLATE_ARGUMENTS std::pair<Page *, bool > BPLUSTREE_TYPE::FindLeafPageByOperation (const KeyType &key, Operation operation, Transaction *transaction, bool leftMost, bool rightMost) { assert (operation == Operation::FIND ? !(leftMost && rightMost) : transaction != nullptr ); root_latch_.lock (); bool is_root_page_id_latched = true ; Page *page = buffer_pool_manager_->FetchPage (root_page_id_); BPlusTreePage *node = reinterpret_cast <BPlusTreePage *>(page->GetData ()); if (operation == Operation::FIND) { page->RLatch (); is_root_page_id_latched = false ; root_latch_.unlock (); } else { page->WLatch (); if (IsSafe (node, operation)) { is_root_page_id_latched = false ; root_latch_.unlock (); } } while (!node->IsLeafPage ()) { InternalPage *i_node = reinterpret_cast <InternalPage *>(node); page_id_t child_node_page_id; if (leftMost) { child_node_page_id = i_node->ValueAt (0 ); } else if (rightMost) { child_node_page_id = i_node->ValueAt (i_node->GetSize () - 1 ); } else { child_node_page_id = i_node->Lookup (key, comparator_); } auto child_page = buffer_pool_manager_->FetchPage (child_node_page_id); auto child_node = reinterpret_cast <BPlusTreePage *>(child_page->GetData ()); if (operation == Operation::FIND) { child_page->RLatch (); page->RUnlatch (); buffer_pool_manager_->UnpinPage (page->GetPageId (), false ); } else { child_page->WLatch (); transaction->AddIntoPageSet (page); if (IsSafe (child_node, operation)) { if (is_root_page_id_latched) { is_root_page_id_latched = false ; root_latch_.unlock (); } UnlockUnpinPages (transaction); } } page = child_page; node = child_node; } return std::make_pair (page, is_root_page_id_latched); }
InsertIntoLeaf() 先找到待插入节点(已上锁),调用叶子节点的Insert
。 如果待插入的key
已存在,Unpin且解锁当前及所有祖先节点。 如果不存在,且无需分裂,解锁并unpin当前页面,返回True。 如果需分裂,调用split
和InsertIntoParent
函数。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 INDEX_TEMPLATE_ARGUMENTS bool BPLUSTREE_TYPE::InsertIntoLeaf (const KeyType &key, const ValueType &value, Transaction *transaction) { auto [leaf_page, root_is_latched] = FindLeafPageByOperation (key, Operation::INSERT, transaction); LeafPage *leaf_node = reinterpret_cast <LeafPage *>(leaf_page->GetData ()); int size = leaf_node->GetSize (); int new_size = leaf_node->Insert (key, value, comparator_); if (new_size == size) { if (root_is_latched) { root_latch_.unlock (); } UnlockUnpinPages (transaction); leaf_page->WUnlatch (); buffer_pool_manager_->UnpinPage (leaf_page->GetPageId (), false ); return false ; } if (new_size < leaf_node->GetMaxSize ()) { if (root_is_latched) { root_latch_.unlock (); } leaf_page->WUnlatch (); buffer_pool_manager_->UnpinPage (leaf_page->GetPageId (), true ); return true ; } LeafPage *new_leaf_node = Split (leaf_node); bool *pointer_root_is_latched = new bool (root_is_latched); InsertIntoParent (leaf_node, new_leaf_node->KeyAt (0 ), new_leaf_node, transaction, pointer_root_is_latched); assert ((*pointer_root_is_latched) == false ); delete pointer_root_is_latched; leaf_page->WUnlatch (); buffer_pool_manager_->UnpinPage (leaf_page->GetPageId (), true ); buffer_pool_manager_->UnpinPage (new_leaf_node->GetPageId (), true ); return true ; }
Split() 拆分函数,插入时将节点分裂。 如果是叶子节点,调用叶子节点的MoveHalfTo
函数,还需设置链表。 如果是非叶子节点,调用中间节点的MoveHalfTo
函数。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 INDEX_TEMPLATE_ARGUMENTS template <typename N>N *BPLUSTREE_TYPE::Split (N *node) { page_id_t new_page_id = INVALID_PAGE_ID; Page *new_page = buffer_pool_manager_->NewPage (&new_page_id); if (nullptr == new_page) { throw std::runtime_error ("out of memory" ); } N *new_node = reinterpret_cast <N *>(new_page->GetData ()); if (node->IsLeafPage ()) { LeafPage *old_leaf_node = reinterpret_cast <LeafPage *>(node); LeafPage *new_leaf_node = reinterpret_cast <LeafPage *>(new_node); new_leaf_node->Init (new_page_id, node->GetParentPageId (), leaf_max_size_); old_leaf_node->MoveHalfTo (new_leaf_node); new_leaf_node->SetNextPageId (old_leaf_node->GetNextPageId ()); old_leaf_node->SetNextPageId (new_leaf_node->GetPageId ()); new_node = reinterpret_cast <N *>(new_leaf_node); } else { InternalPage *old_internal_node = reinterpret_cast <InternalPage *>(node); InternalPage *new_internal_node = reinterpret_cast <InternalPage *>(new_node); new_internal_node->Init (new_page_id, node->GetParentPageId (), internal_max_size_); old_internal_node->MoveHalfTo (new_internal_node, buffer_pool_manager_); new_node = reinterpret_cast <N *>(new_internal_node); } return new_node; }
InsertIntoParent() 拆分(Split)后,向上找到old_node的父结点。 将new_node的第一个key插入到父结点,其位置在 父结点指向old_node的孩子指针之后。 如果插入后>=maxsize,则必须继续拆分父结点,然后在其父结点的父结点再插入,即需要递归。 直到找到的old_node为根结点时,结束递归(此时将会新建一个根R,关键字为key,old_node和new_node为其孩子)。 注意:本函数执行完毕后,new node和old node都需要在函数外面进行unpin。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 INDEX_TEMPLATE_ARGUMENTS void BPLUSTREE_TYPE::InsertIntoParent (BPlusTreePage *old_node, const KeyType &key, BPlusTreePage *new_node, Transaction *transaction, bool *root_is_latched) { if (old_node->IsRootPage ()) { page_id_t new_page_id = INVALID_PAGE_ID; Page *new_page = buffer_pool_manager_->NewPage (&new_page_id); root_page_id_ = new_page_id; InternalPage *new_root_node = reinterpret_cast <InternalPage *>(new_page->GetData ()); new_root_node->Init (new_page_id, INVALID_PAGE_ID, internal_max_size_); new_root_node->PopulateNewRoot (old_node->GetPageId (), key, new_node->GetPageId ()); old_node->SetParentPageId (new_page_id); new_node->SetParentPageId (new_page_id); buffer_pool_manager_->UnpinPage (new_page->GetPageId (), true ); UpdateRootPageId (0 ); if (*root_is_latched) { *root_is_latched = false ; root_latch_.unlock (); } UnlockPages (transaction); return ; } Page *parent_page = buffer_pool_manager_->FetchPage (old_node->GetParentPageId ()); InternalPage *parent_node = reinterpret_cast <InternalPage *>(parent_page->GetData ()); parent_node->InsertNodeAfter (old_node->GetPageId (), key, new_node->GetPageId ()); if (parent_node->GetSize () < parent_node->GetMaxSize ()) { if (*root_is_latched) { *root_is_latched = false ; root_latch_.unlock (); } UnlockPages (transaction); buffer_pool_manager_->UnpinPage (parent_page->GetPageId (), true ); return ; } InternalPage *new_parent_node = Split (parent_node); InsertIntoParent (parent_node, new_parent_node->KeyAt (0 ), new_parent_node, transaction, root_is_latched); buffer_pool_manager_->UnpinPage (parent_page->GetPageId (), true ); buffer_pool_manager_->UnpinPage (new_parent_node->GetPageId (), true ); }
Test 1 2 3 4 5 6 7 8 9 10 11 12 13 14 $ ./test/b_plus_tree_insert_test Running main() from gmock_main.cc [==========] Running 2 tests from 1 test suite. [----------] Global test environment set-up. [----------] 2 tests from BPlusTreeTests [ RUN ] BPlusTreeTests.InsertTest1 [ OK ] BPlusTreeTests.InsertTest1 (1 ms) [ RUN ] BPlusTreeTests.InsertTest2 [ OK ] BPlusTreeTests.InsertTest2 (0 ms) [----------] 2 tests from BPlusTreeTests (1 ms total) [----------] Global test environment tear-down [==========] 2 tests from 1 test suite ran. (1 ms total) [ PASSED ] 2 tests.