level当中的数据存储格式分为当前可写的MemTable,不可变的MemTable,以及文件当中的Table。

首先来介绍一下这几种数据格式,然后以数据的流向为视角,了解整个数据链路。

Table文件

leveldb当中写入的数据最终都要落到磁盘当中的,这里就先介绍一下数据在磁盘当中的数据存储方式。参照下图进行讲解

Table文件格式图

⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’

Excalidraw Data

Text Elements

磁盘上的持久化的Table文件格式

filter block(可选)

data block

data block

footer

meta index block

index block

数据部分

元数据部分

固定尾部

Link to original
可以看到Table文件的基本单位是block,大致可以分为三块,数据块,元数据块以及固定尾部。

为什么元数据在数据文件后面,这样每次读取文件都需要先移动指针到尾部才能开始读取?

因为元数据的信息是需要获取到所有block之后才能确定的,如果想要将元数据写入文件头的话,那么就需要将所有要写入的数据块缓存在内容当中,然后等待写入结束获取所有的block之后才能开始写入。这样即无法估算内存,也无法进行流式写入,IO操作也会比较阻塞。

  • data block:kv键值对的数据
  • filter block:可选项,如果配置了的话,那么在写入的过程中,可以将所有的key记录在这里,方便读取的时候进行过滤
  • meta index block:将一些配置选项保存在这里,如果filter block选项打开了的话,在这里还会记录filter block在文件当中的偏移位置以及大小。
  • index block:记录了每个data block的上限范围,以及在文件当中的偏移和大小。就像名字一样充当索引的作为,可以根据key快速确定block的位置。
  • footer:固定长度的部分。记录了文件的魔数以及meta index block, index block这两个块在文件中的偏移地址和大小。

Table文件的读取

经过了前面的介绍,其实读取的逻辑也比较清楚了。

  1. 读取固定的尾部footer
  2. 从footer中解析出meta index block, index block的位置信息,读取这两个元数据块
  3. meta index block块还可以将filter block块也读取出来。 到这里其实完成的Table文件的打开操作,也就是完成元数据的读取,因为data block的内容都可以根据元数据的信息定位到,然后从文件当中读取即可。
Status Table::Open(const Options& options, RandomAccessFile* file,  
                   uint64_t size, Table** table) {  
  *table = nullptr;  
  if (size < Footer::kEncodedLength) {  
    return Status::Corruption("file is too short to be an sstable");  
  }  
  // 读取固定尾部footer 
  char footer_space[Footer::kEncodedLength];  
  Slice footer_input;  
  Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,  
                        &footer_input, footer_space);  
  if (!s.ok()) return s;  
  // 将尾部信息解码到footer对象当中  
  Footer footer;  
  s = footer.DecodeFrom(&footer_input);  
  if (!s.ok()) return s;  
   
  // 读取index block  
  BlockContents index_block_contents;  
  ReadOptions opt;  
  if (options.paranoid_checks) {  
    opt.verify_checksums = true;  
  }  
  s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);  
  
  if (s.ok()) {  
    // 读取完成footer和index block,就可以准备好服务请求了  
    Block* index_block = new Block(index_block_contents);  
    Rep* rep = new Table::Rep;  
    rep->options = options;  
    rep->file = file;  
    rep->metaindex_handle = footer.metaindex_handle();  
    rep->index_block = index_block;  
    rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);  
    rep->filter_data = nullptr;  
    rep->filter = nullptr;  
    *table = new Table(rep);  
    // 读取meta index block和filter block
    (*table)->ReadMeta(footer);  
  }  
  
  return s;  
}

Table文件的写入

跟读取是一个相反的过程

void TableBuilder::Add(const Slice& key, const Slice& value) {  
  Rep* r = rep_;  
  assert(!r->closed);  
  if (!ok()) return;  
  if (r->num_entries > 0) {  
    // 后添加的key要比前面的key大  
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);  
  }  
  // index block是以块为单位的,所以遇到下一个块的第一个key的时候才能确定上一个块的数据范围以及偏移量和大小
  if (r->pending_index_entry) {  
    assert(r->data_block.empty());  
    // 基于上一个key找到能够比较大小的最短前缀,设置到last_key当中  
    r->options.comparator->FindShortestSeparator(&r->last_key, key);  
    std::string handle_encoding;  
    r->pending_handle.EncodeTo(&handle_encoding);  
    // 读取下一块的第一个key的时候,确定了上一个块的边界,现在将这个边界和数据块在Table文件中的偏移量与长度写入index block当中  
    r->index_block.Add(r->last_key, Slice(handle_encoding));  
    r->pending_index_entry = false;  
  }  
  // 如果设置了filter_block,则每个key也都添加到其中  
  if (r->filter_block != nullptr) {  
    r->filter_block->AddKey(key);  
  }  
  // 更新last_key为最新key  
  r->last_key.assign(key.data(), key.size());  
  r->num_entries++;  
  // 每个键值对都添加到数据块中  
  r->data_block.Add(key, value);  
  // 估算大小,data_block达到设定的块大小之后,刷新到磁盘当中
  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();  
  if (estimated_block_size >= r->options.block_size) {  
    Flush();  
  }  
}

Flush方法负责刷新块到磁盘,同时更新index_block相关的数据以便下一次写入

void TableBuilder::Flush() {  
  Rep* r = rep_;  
  assert(!r->closed);  
  if (!ok()) return;  
  if (r->data_block.empty()) return;  
  assert(!r->pending_index_entry);  
  WriteBlock(&r->data_block, &r->pending_handle);  
  if (ok()) {  
    r->pending_index_entry = true; // block写入磁盘之后刷新此值  
    r->status = r->file->Flush();  
  }  
  if (r->filter_block != nullptr) {  
    r->filter_block->StartBlock(r->offset);  
  }  
}

写入完成之后,会调用Finish方法将元数据块写入其中

Status TableBuilder::Finish() {  
  Rep* r = rep_;  
  Flush();  
  assert(!r->closed);  
  r->closed = true;  
  
  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;  
  
  // Write filter block  
  if (ok() && r->filter_block != nullptr) {  
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,  
                  &filter_block_handle);  
  }  
  
  // Write metaindex block  
  if (ok()) {  
    BlockBuilder meta_index_block(&r->options);  
    if (r->filter_block != nullptr) {  
      // Add mapping from "filter.Name" to location of filter data  
      std::string key = "filter.";  
      key.append(r->options.filter_policy->Name());  
      std::string handle_encoding;  
      filter_block_handle.EncodeTo(&handle_encoding);  
      meta_index_block.Add(key, handle_encoding);  
    }  
  
    // TODO(postrelease): Add stats and other meta blocks  
    WriteBlock(&meta_index_block, &metaindex_block_handle);  
  }

MemTable

Table文件是在磁盘当中的存储文件格式,不过在leveldb进行写入的时候是先写入内存表的,而这个内存表就是由memtable维护的,其底层实现就是一个跳表。

MemTable写入

就是将kv组装成entry格式,然后插入到跳表当中。这个格式也在注释当中写明了。

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,  
                   const Slice& value) {  
  // Format of an entry is concatenation of:  
  //  key_size     : varint32 of internal_key.size()  
  //  key bytes    : char[internal_key.size()]  
  //  tag          : uint64((sequence << 8) | type)  
  //  value_size   : varint32 of value.size()  
  //  value bytes  : char[value.size()]  
  size_t key_size = key.size();  
  size_t val_size = value.size();  
  size_t internal_key_size = key_size + 8;
  // 计算entry的大小  
  const size_t encoded_len = VarintLength(internal_key_size) +  
                             internal_key_size + VarintLength(val_size) +  
                             val_size;  
  // arene中申请内存
  char* buf = arena_.Allocate(encoded_len);  
  // 按照格式将entry写入申请的内存当中
  char* p = EncodeVarint32(buf, internal_key_size);  
  std::memcpy(p, key.data(), key_size);  
  p += key_size;  
  EncodeFixed64(p, (s << 8) | type);  // tag前56bit表示序列号,后8为表示key的类型
  p += 8;  
  p = EncodeVarint32(p, val_size);  
  std::memcpy(p, value.data(), val_size);  
  assert(p + val_size == buf + encoded_len);  
  table_.Insert(buf);  // 最后将entry写入跳表即可
}

值的注意一下的就是这里代码中的table_并不是前面介绍的Table文件,而是memtable中为跳表定义的类型别名

typedef SkipList<const char*, KeyComparator> Table;
Table table_;

并且分为当前可写的跳表与不可写的跳表。

MemTable* mem_;  
MemTable* imm_ GUARDED_BY(mutex_);  // Memtable being compacted

可以看到mem表示当前的memtable,imm表示不可变的memtable也就是写满了指定的内存大小。 进行写入操作的时候,会先判断mem是否有足够的空间,有则无须额外的操作,没有的话说明mem已经写满,那么mem变为imm,重新申请新的memtable给mem使用。

imm_ = mem_;  
has_imm_.store(true, std::memory_order_release);  
mem_ = new MemTable(internal_comparator_);  
mem_->Ref();  
force = false;  // Do not force another compaction if have room  
MaybeScheduleCompaction();

并尝试启动线程来异步完成合并操作,就是MaybeScheduleCompaction所负责的。

Compact合并

MaybeScheduleCompaction最终是执行了BackgroundCompaction负责合并。 首先是创建一个compaction对象,指定要合并的level和合并的范围

void DBImpl::BackgroundCompaction() {  
  mutex_.AssertHeld();  
  // 合并immutable memtable,同时删除过期的文件
  if (imm_ != nullptr) {  
    CompactMemTable();  
    return;  
  }  
  // 创建compaction对象,分为手动合并和自动合并
  Compaction* c;  
  bool is_manual = (manual_compaction_ != nullptr);  
  InternalKey manual_end;  
  if (is_manual) {  
    ManualCompaction* m = manual_compaction_;  
    // 手动合并则使用手动指定的level和range创建compaction对象  
    c = versions_->CompactRange(m->level, m->begin, m->end);  
    m->done = (c == nullptr);  
    if (c != nullptr) {  
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;  
    }  
    Log(options_.info_log,  
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",  
        m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),  
        (m->end ? m->end->DebugString().c_str() : "(end)"),  
        (m->done ? "(end)" : manual_end.DebugString().c_str()));  
  } else {  
    // 自动合并则挑选  
    c = versions_->PickCompaction();  
  }  
 
  Status status;  
  if (c == nullptr) {  
    // Nothing to do  
  } else if (!is_manual && c->IsTrivialMove()) {  
    // Move file to next level  
    // 将文件移动到下一level  
    assert(c->num_input_files(0) == 1);  
    FileMetaData* f = c->input(0, 0);  
    // 将f从当前level移除,加入到下一level当中  
    c->edit()->RemoveFile(c->level(), f->number);  
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,  
                       f->largest);  
    // 将修改应用到version生效  
    status = versions_->LogAndApply(c->edit(), &mutex_);  
    if (!status.ok()) {  
      RecordBackgroundError(status);  
    }  
    VersionSet::LevelSummaryStorage tmp;  
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",  
        static_cast<unsigned long long>(f->number), c->level() + 1,  
        static_cast<unsigned long long>(f->file_size),  
        status.ToString().c_str(), versions_->LevelSummary(&tmp));  
  } else {  
    // 合并文件的重点操作
    CompactionState* compact = new CompactionState(c);  
    status = DoCompactionWork(compact);  
    if (!status.ok()) {  
      RecordBackgroundError(status);  
    }  
    CleanupCompaction(compact);  
    c->ReleaseInputs();  
    // 删除过时文件,即不会再被引用到的文件
    RemoveObsoleteFiles();  
  }  
  delete c;  
  
  if (status.ok()) {  
    // Done  
  } else if (shutting_down_.load(std::memory_order_acquire)) {  
    // Ignore compaction errors found during shutting down  
  } else {  
    Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());  
  }  
  
  if (is_manual) {  
    ManualCompaction* m = manual_compaction_;  
    if (!status.ok()) {  
      m->done = true;  
    }  
    if (!m->done) {  
      // We only compacted part of the requested range.  Update *m  
      // to the range that is left to be compacted.      
      m->tmp_storage = manual_end;  
      m->begin = &m->tmp_storage;  
    }  
    manual_compaction_ = nullptr;  
  }  
}