level当中的数据存储格式分为当前可写的MemTable,不可变的MemTable,以及文件当中的Table。
首先来介绍一下这几种数据格式,然后以数据的流向为视角,了解整个数据链路。
Table文件
leveldb当中写入的数据最终都要落到磁盘当中的,这里就先介绍一下数据在磁盘当中的数据存储方式。参照下图进行讲解
可以看到Table文件的基本单位是block,大致可以分为三块,数据块,元数据块以及固定尾部。Table文件格式图
⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’
Excalidraw Data
Text Elements
磁盘上的持久化的Table文件格式
filter block(可选)
data block
…
data block
footer
meta index block
index block
数据部分
元数据部分
固定尾部
Link to original
为什么元数据在数据文件后面,这样每次读取文件都需要先移动指针到尾部才能开始读取?
因为元数据的信息是需要获取到所有block之后才能确定的,如果想要将元数据写入文件头的话,那么就需要将所有要写入的数据块缓存在内容当中,然后等待写入结束获取所有的block之后才能开始写入。这样即无法估算内存,也无法进行流式写入,IO操作也会比较阻塞。
- data block:kv键值对的数据
- filter block:可选项,如果配置了的话,那么在写入的过程中,可以将所有的key记录在这里,方便读取的时候进行过滤
- meta index block:将一些配置选项保存在这里,如果filter block选项打开了的话,在这里还会记录filter block在文件当中的偏移位置以及大小。
- index block:记录了每个data block的上限范围,以及在文件当中的偏移和大小。就像名字一样充当索引的作为,可以根据key快速确定block的位置。
- footer:固定长度的部分。记录了文件的魔数以及meta index block, index block这两个块在文件中的偏移地址和大小。
Table文件的读取
经过了前面的介绍,其实读取的逻辑也比较清楚了。
- 读取固定的尾部footer
- 从footer中解析出meta index block, index block的位置信息,读取这两个元数据块
- meta index block块还可以将filter block块也读取出来。 到这里其实完成的Table文件的打开操作,也就是完成元数据的读取,因为data block的内容都可以根据元数据的信息定位到,然后从文件当中读取即可。
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}
// 读取固定尾部footer
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;
// 将尾部信息解码到footer对象当中
Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;
// 读取index block
BlockContents index_block_contents;
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);
if (s.ok()) {
// 读取完成footer和index block,就可以准备好服务请求了
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
// 读取meta index block和filter block
(*table)->ReadMeta(footer);
}
return s;
}Table文件的写入
跟读取是一个相反的过程
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
// 后添加的key要比前面的key大
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
// index block是以块为单位的,所以遇到下一个块的第一个key的时候才能确定上一个块的数据范围以及偏移量和大小
if (r->pending_index_entry) {
assert(r->data_block.empty());
// 基于上一个key找到能够比较大小的最短前缀,设置到last_key当中
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
// 读取下一块的第一个key的时候,确定了上一个块的边界,现在将这个边界和数据块在Table文件中的偏移量与长度写入index block当中
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
// 如果设置了filter_block,则每个key也都添加到其中
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
// 更新last_key为最新key
r->last_key.assign(key.data(), key.size());
r->num_entries++;
// 每个键值对都添加到数据块中
r->data_block.Add(key, value);
// 估算大小,data_block达到设定的块大小之后,刷新到磁盘当中
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}Flush方法负责刷新块到磁盘,同时更新index_block相关的数据以便下一次写入
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true; // block写入磁盘之后刷新此值
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}写入完成之后,会调用Finish方法将元数据块写入其中
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}MemTable
Table文件是在磁盘当中的存储文件格式,不过在leveldb进行写入的时候是先写入内存表的,而这个内存表就是由memtable维护的,其底层实现就是一个跳表。
MemTable写入
就是将kv组装成entry格式,然后插入到跳表当中。这个格式也在注释当中写明了。
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
// 计算entry的大小
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
// arene中申请内存
char* buf = arena_.Allocate(encoded_len);
// 按照格式将entry写入申请的内存当中
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type); // tag前56bit表示序列号,后8为表示key的类型
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf); // 最后将entry写入跳表即可
}值的注意一下的就是这里代码中的table_并不是前面介绍的Table文件,而是memtable中为跳表定义的类型别名
typedef SkipList<const char*, KeyComparator> Table;
Table table_;并且分为当前可写的跳表与不可写的跳表。
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted可以看到mem表示当前的memtable,imm表示不可变的memtable也就是写满了指定的内存大小。 进行写入操作的时候,会先判断mem是否有足够的空间,有则无须额外的操作,没有的话说明mem已经写满,那么mem变为imm,重新申请新的memtable给mem使用。
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();并尝试启动线程来异步完成合并操作,就是MaybeScheduleCompaction所负责的。
Compact合并
MaybeScheduleCompaction最终是执行了BackgroundCompaction负责合并。
首先是创建一个compaction对象,指定要合并的level和合并的范围
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
// 合并immutable memtable,同时删除过期的文件
if (imm_ != nullptr) {
CompactMemTable();
return;
}
// 创建compaction对象,分为手动合并和自动合并
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
// 手动合并则使用手动指定的level和range创建compaction对象
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == nullptr);
if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
// 自动合并则挑选
c = versions_->PickCompaction();
}
Status status;
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
// 将文件移动到下一level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
// 将f从当前level移除,加入到下一level当中
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
// 将修改应用到version生效
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
// 合并文件的重点操作
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
// 删除过时文件,即不会再被引用到的文件
RemoveObsoleteFiles();
}
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
}
if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = nullptr;
}
}