Datafusion使用object_store这个crate来接入存储服务。支持
- AWS S3
- Azure Blob Storage
- Google Cloud Storage
- Local Files
- Memory
- HTTP/WebDAV Storage
- 自定义实现 可以看到这个crate虽然叫做对象存储,但是本身可以算是一个对象存储的统一接口层,然后默认提供了几个云厂商的对象存储服务以及本地文件、内存等实现。
用户可以通过实现trait来接入自定义的存储服务。不过默认的存储实现基本上也都够用了,下面看一下object_store提供的对象存储服务的接口是什么样的。
ObjectStore trait
和文件基本类型,独享存储无非就是对象的读写,额外再加上分片并发读写的概念。
#[async_trait]
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
// 上传对象,这是一个原子操作
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
.await
}
// 与put相同,不过可以传递opts
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult>;
// 分片上传
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
self.put_multipart_opts(location, PutMultipartOpts::default())
.await
}
// 分片上传的opt版本
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>>;
// 读取对象
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
}
// get的opt版本
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
// 读取range范围的对象内容
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
}
// 多range读取版本
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
OBJECT_STORE_COALESCE_DEFAULT,
)
.await
}
// head方法,用于获取对象的元数据信息
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions {
head: true,
..Default::default()
};
Ok(self.get_opts(location, options).await?.meta)
}
// 删除对对象文件
async fn delete(&self, location: &Path) -> Result<()>;
// 批量删除对象文件
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
locations
.map(|location| async {
let location = location?;
self.delete(&location).await?;
Ok(location)
})
.buffered(10)
.boxed()
}
// 按照prefix列出文件
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
// 可以跳过前offset的list版本
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = offset.clone();
self.list(prefix)
.try_filter(move |f| futures::future::ready(f.location > offset))
.boxed()
}
// 以分隔符结束的list的版本,也不会递归
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
// 对象文件拷贝
async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
// 重命名,其实就是拷贝+删除就对象
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.copy(from, to).await?;
self.delete(from).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_if_not_exists(from, to).await?;
self.delete(from).await
}
}究其底层实现,也就是转为http请求而已。其他云厂商的对象存储同理。
#[derive(Debug, Clone)]
pub struct AmazonS3 {
client: Arc<S3Client>,
}
#[derive(Debug)]
pub(crate) struct S3Client {
pub config: S3Config,
pub client: HttpClient,
}至于内存和本地文件的实现则更加简单了,就是内存数据结构的修改以及本地文件的读写了。
Datafusion也是完全使用object_store这个crate来对接存储的,也就编写测试自己实现了需要的object_store。 因此就不在object_store这一层投入过多了,需要的话就直接了解object_store的实现即可。