Leveldb针对文件IO场景,定义了三种类型的文件操作接口。

  • SequentialFile:顺序读,如日志文件的读取、Mainfest文件的读取。
  • RandomAccessFile:随机读,如SSTable 文件的读取。
  • WritableFile:顺序写,用于日志文件、SSTable文件、Mainfest文件的写入。

这三个都是纯虚类,具体平台相关的内容由实现类实现。

class SequentialFile {
public:
    SequentialFile() = default;
    virtual ~SequentialFile();
 
    SequentialFile(const SequentialFile&) = delete;
    SequentialFile& operator=(const SequentialFile&) = delete;
 
    virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
    virtual Status Skip(uint64_t n) = 0;
};
 
class RandomAccessFile {
public:
    RandomAccessFile() = default;
    virtual ~RandomAccessFile();
 
    RandomAccessFile(const RandomAccessFile&) = delete;
    RandomAccessFile& operator=(const RandomAccessFile&) = delete;
 
    virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const = 0;
 
};
 
class WritableFile {
public:
    WritableFile();
    virtual ~WritableFile();
 
    WritableFile(const WritableFile&) = delete;
    WritableFile& operator=(const WritableFile&) = delet;
 
    virtual Status Append(const Slice& data) = 0;
    virtual Status Close() = 0;
    virtual Status Flush() = 0;
    virtual Status Sync() = 0;
};

这里可以使用Posix下的实现:

class PosixSequentialFile final : public SequentialFile {
public:
    PosixSequentialFile(std::string filename, int fd)
        : fd_(fd), filename_(std::move(filename)) {}
    ~PosixSequentialFile() override { close(fd_); }
 
    Status Read(size_t n, Slice* result, char* scratch) override {
        Status status;
        while (true) {
            ::ssize_t read_size = ::read(fd_, scratch, n);
            if (read_size < 0) {
                if (errno == EINTR) {
                    continue;
                }
                Status = PosixError(filename_, errno);
                break;
            }
            *result = Slice(scratch, read_size);
            break;
        }
        return status;
    }
 
    Status Skip(uint64_t n) override {
        if (::lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
            return PosixError(filename_, errno);
        }
        return Status::OK();
    }
private:
    const int fd_;
    const std::string filename_;
};

这里使用的一个错误处理函数

Status PosixError(const std::string& context, int error_number) {
  if (error_number == ENOENT) {
    return Status::NotFound(context, std::strerror(error_number));
  } else {
    return Status::IOError(context, std::strerror(error_number));
  }
}

像文件的读写都需要文件描述符,为了避免文件描述符或者其它的某些资源被耗尽,提供了一个辅助类,用来限制资源的使用。

class Limiter {
public:
    Limiter(int max_acquires)
        : max_acquires_(max_acquires),
          acquires_allowed_(max_acquires) {
        assert(max_acquires > 0);
    }
    Limiter(const Limiter&) = delete;
    Limiter& operator=(const Limiter&) = delete;
    
    bool Acquire() {
        int old_acquries_allowed =
            acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
        if (old_acquries_allowed > 0) return true;
        int pre_increment_acquires_allowed =
            acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
        assert(pre_increment_acquires_allowed < max_acquires_);
    }
 
    void Release() {
        int old_acquires_allowed =
            acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
        assert(old_acquires_allowed < max_acquires_)
    }
private:
    const int max_acquires_;
    std::atomic<int> acquires_allowed_;
};