leveldb使用LRU算法来作为缓存的实现。

LRU Cache的结构

LRU算法的实现就是使用一个哈希表记录缓存的数据节点,然后使用一个链表按照访问时间记录数据节点的联系,以便淘汰最近最旧未访问的数据。

class LRUCache {
...
private:	  
	// cache的容量 
	size_t capacity_;  
	  
	// mutex_保护下面状态的锁
	mutable port::Mutex mutex_;  
	size_t usage_ GUARDED_BY(mutex_);  
	  
	// LRU链表的虚拟头节点。这里是一个双向环链表,因此
	// 使用lru.prev指向最新的entry,lru.next指向最旧的entry 
	// refs==1 and in_cache==true 的entry在lru_
	LRUHandle lru_ GUARDED_BY(mutex_);  
	  
	// Dummy head of in-use list.  
	// 正在被使用的链表的头节点
	// refs >= 2 and in_cache==true 的entry在in_use_
	LRUHandle in_use_ GUARDED_BY(mutex_);  
	// 哈希表
	HandleTable table_ GUARDED_BY(mutex_);
}

数据节点

使用下面的结构体来表示数据节点。

struct LRUHandle {  
  void* value;  
  void (*deleter)(const Slice&, void* value);  
  LRUHandle* next_hash;  
  LRUHandle* next;  
  LRUHandle* prev;  
  size_t charge;  // TODO(opt): Only allow uint32_t?  
  size_t key_length;  
  bool in_cache;     // Whether entry is in the cache.  
  uint32_t refs;     // References, including cache reference, if present.  
  uint32_t hash;     // Hash of key(); used for fast sharding and comparisons  
  char key_data[1];  // Beginning of key  
  
  Slice key() const {  
    // next is only equal to this if the LRU handle is the list head of an  
    // empty list. List heads never have meaningful keys.    assert(next != this);  
  
    return Slice(key_data, key_length);  
  }  
};

借助c的一个小技巧,在结构体末尾定义一个数组可以表示变长。也没有什么别的好说的,就是数据部分,双向链表指针部分,作为一个普通的数据节点而已。

不过还具有in_cache表示是否在缓存当中,以及refs说明有多少正在使用本节点的地方。

哈希表

哈希表就是一个数组+链表,使用哈希值确定数组的下标,然后加入到索引当中去。 不过在下面的代码当中,可以看到使用一个二级指针来表示数组+链表的,并没有多大的差别。

// 处于性能考虑,并没有使用stl库中的哈希表,而是自己实现了一个,即简单又高效
class HandleTable {  
...
private:  
   uint32_t length_;  // 数组(桶)的长度
   uint32_t elems_;  // 元素的个数
   LRUHandle** list_;  
};

初始化/扩容

首先来看初始化,也是扩容的逻辑。

  1. 初始化/扩容 的桶大小以4为开始,以大于当前元素个数的最小2的幂结束。
  2. 将就哈希表中的元素重新哈希到新的哈希表当中
  3. 释放旧哈希表的空间。
void Resize() {  
  // 确定新哈希表的桶大小
  uint32_t new_length = 4;  
  while (new_length < elems_) {  
    new_length *= 2;  
  }  
  // 申请新的哈希表的桶并初始化
  LRUHandle** new_list = new LRUHandle*[new_length];  
  memset(new_list, 0, sizeof(new_list[0]) * new_length);  
  uint32_t count = 0;  
  // 将旧哈希表中的元素重哈希到新表当中
  for (uint32_t i = 0; i < length_; i++) {  
    LRUHandle* h = list_[i];  
    while (h != nullptr) {  
      LRUHandle* next = h->next_hash;  
      uint32_t hash = h->hash;  
      LRUHandle** ptr = &new_list[hash & (new_length - 1)];  
      h->next_hash = *ptr;  
      *ptr = h;  
      h = next;  
      count++;  
    }  
  }  
  // 释放旧表的空间,重设新哈希表的容量和指针
  assert(elems_ == count);  
  delete[] list_;  
  list_ = new_list;  
  length_ = new_length;  
}

插入/查询/删除

这三个操作需要定位到给定元素在哈希表的位置,依赖下面的方法

LRUHandle** FindPointer(const Slice& key, uint32_t hash) {  
  LRUHandle** ptr = &list_[hash & (length_ - 1)];  // 根据哈希确定桶
  // 遍历桶中的元素查找哈希值相同并且值相同的元素,返回指针的地址,方便进行修改
  while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {  
    ptr = &(*ptr)->next_hash;  
  }  
  return ptr;  
}

对于插入来说,FindPointer返回的就是可插入的位置,将当前元素插入到所在位置即可,插入之后返回旧的entry的指针。

LRUHandle* Insert(LRUHandle* h) {
  // 要么返回链表指向当前节点的指针的地址,要么返回链表默认的指针的地址
  LRUHandle** ptr = FindPointer(h->key(), h->hash);  
  LRUHandle* old = *ptr;  
  // 插入到当中位置,存在则直接覆盖,不存在添加到链表末尾
  h->next_hash = (old == nullptr ? nullptr : old->next_hash);  
  *ptr = h;  
  if (old == nullptr) {  
	  // 新增需要判断是否扩容
    ++elems_;  
    if (elems_ > length_) {  
      // Since each cache entry is fairly large, we aim for a small  
      // average linked list length (<= 1).      
      Resize();  
    }  
  }  
  return old;  
}

插入就是直接调用一下查找位置的函数

LRUHandle* Lookup(const Slice& key, uint32_t hash) {  
  return *FindPointer(key, hash);  
}

删除操作,找到位置直接直接将指针修改到下一个节点即可,返回被删除的节点的指针

LRUHandle* Remove(const Slice& key, uint32_t hash) {  
  LRUHandle** ptr = FindPointer(key, hash);  
  LRUHandle* result = *ptr;  
  if (result != nullptr) {  
    *ptr = result->next_hash;  
    --elems_;  
  }  
  return result;  
}

LRU Cache功能

最开始已经介绍过了LRU Cache的组成,下面来看提供方法的实现。

插入

插入到哈希表,in_use链表,删除掉旧的cache entry。

Cache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value,  
                                size_t charge,  
                                void (*deleter)(const Slice& key,  
                                                void* value)) {  
  // 整个函数执行期间加锁
  MutexLock l(&mutex_);  
  // 申请cache entry的内存并初始化字段
  LRUHandle* e =  
      reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));  
  e->value = value;  
  e->deleter = deleter;  
  e->charge = charge;  
  e->key_length = key.size();  
  e->hash = hash;  
  e->in_cache = false;  
  e->refs = 1;  // for the returned handle.  
  std::memcpy(e->key_data, key.data(), key.size());  
  
  if (capacity_ > 0) {  
	// 加入到in_use_链表
    e->refs++;  // for the cache's reference.  
    e->in_cache = true;  
    LRU_Append(&in_use_, e);  
    usage_ += charge; 
    // 加入哈希表并删除旧的cache entry(如果存在)
    FinishErase(table_.Insert(e));  
  } else {  // don't cache. (capacity_==0 is supported and turns off caching.)  
    // next is read by key() in an assert, so it must be initialized    e->next = nullptr;  
  }  
  // 超过容量限制,淘汰最旧元素并释放内存直到在容量下
  // 被淘汰的是lru_链表当中最旧的entry
  while (usage_ > capacity_ && lru_.next != &lru_) {  
    LRUHandle* old = lru_.next;  
    assert(old->refs == 1);  
    bool erased = FinishErase(table_.Remove(old->key(), old->hash));  
    if (!erased) {  // to avoid unused variable when compiled NDEBUG  
      assert(erased);  
    }  
  }  
  
  return reinterpret_cast<Cache::Handle*>(e);  
}

查询

从哈希表中查数即可,不过需要将引用entry的引用计数+1。

Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {  
  MutexLock l(&mutex_);  
  LRUHandle* e = table_.Lookup(key, hash);  
  if (e != nullptr) {  
    Ref(e);  
  }  
  return reinterpret_cast<Cache::Handle*>(e);  
}

查询到的key需要增加refs计数,如果已经在lru_链表当中的话,还需要从中移除,重新加入到in_use_链表当中

void LRUCache::Ref(LRUHandle* e) {  
  if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.  
    LRU_Remove(e);  
    LRU_Append(&in_use_, e);  
  }  
  e->refs++;  
}

通过引用计数可以避免每次查询都操作链表,而是先操作引用计数,并且只在引用计数为特殊值的时候才操作链表。

所以查询的结果使用完之后还需要调用Release来减少引用计数才符合规范,当然插入操作返回的entry也是引用计数+1的,同样需要在使用完成之后进行Release

void LRUCache::Release(Cache::Handle* handle) {  
  MutexLock l(&mutex_);  
  Unref(reinterpret_cast<LRUHandle*>(handle));  
}

当引用计数为0的时候,表示可以释放这个entry了,如果in_cahce为true,refs计数为1则从in_use_移动到lru_链表当中表示使用结束进入淘汰列表了。

void LRUCache::Unref(LRUHandle* e) {  
  assert(e->refs > 0);  
  e->refs--;  
  if (e->refs == 0) {  // Deallocate.  
    assert(!e->in_cache);  
    (*e->deleter)(e->key(), e->value);  
    free(e);  
  } else if (e->in_cache && e->refs == 1) {  
    // No longer in use; move to lru_ list.  
    LRU_Remove(e);  
    LRU_Append(&lru_, e);  
  }  
}

删除

void LRUCache::Erase(const Slice& key, uint32_t hash) {  
  MutexLock l(&mutex_);  
  FinishErase(table_.Remove(key, hash));  
}

从哈希表中移除这个entry,并调用FinishErase进行删除

bool LRUCache::FinishErase(LRUHandle* e) {  
  if (e != nullptr) {  
    assert(e->in_cache);  
    // 先将节点移除,无论是lru_还是in_use_链表
    LRU_Remove(e);  
    e->in_cache = false;  
    usage_ -= e->charge;  
    // 引用计数-1,如果引用
    Unref(e);  
  }  
  return e != nullptr;  
}

总结

  1. LRUCache当中使用哈希表记录了缓存的数据节点,然后使用了两个链表lru和in_use,其中in_use记录当前正在被使用的缓存,lru记录没有被使用的缓存即淘汰队列当中的缓存。
    1. 当内存不足的时候就淘汰lru中最旧的记录即可。
    2. 为了节省内存,缓存的数据只有一份,被哈希表,两个链表,以及其他使用缓存的地方锁共享,因此在缓存的节点当中,记录了哈希桶中链表的指针,以及在lru或in_use两个双向链表当中的前向和后向指针。
  2. 因为避免频繁操作链表,缓存节点引入了refs值,每当读取到缓存的时候,就需要将其引用值加1,缓存使用完成后必须要使用Release进行释放,将引用值减1。当引用值为0的时候,即可以释放缓存的内存了。但是默认情况下,缓存是只有被淘汰的情况下才会释放,其他情况下保留在内存当中即可,这就是lru链表的作用了,在lru链表当中的缓存refs数量都为1,当被引用了之后,从lru中移除加入到in_use当中,引用计数加1,Release的时候ref-1之后变为1了,就从in_use中移除加入到lru当中
  3. 还可以手动删除指定的缓存,为此又引入了in_cache字段,因此删除的缓存可能正在被使用,所以缓存实际被删除是在最后一个引用被释放掉的时候。为了避免refs减为1的时候做不必要的从in_use移动到lru的操作,直接额外使用in_cache作为判断提交来判断是否要执行这个逻辑。