Datafusion使用object_store这个crate来接入存储服务。支持

  • AWS S3
  • Azure Blob Storage
  • Google Cloud Storage
  • Local Files
  • Memory
  • HTTP/WebDAV Storage
  • 自定义实现 可以看到这个crate虽然叫做对象存储,但是本身可以算是一个对象存储的统一接口层,然后默认提供了几个云厂商的对象存储服务以及本地文件、内存等实现。

用户可以通过实现trait来接入自定义的存储服务。不过默认的存储实现基本上也都够用了,下面看一下object_store提供的对象存储服务的接口是什么样的。

ObjectStore trait

和文件基本类型,独享存储无非就是对象的读写,额外再加上分片并发读写的概念。

#[async_trait]  
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {  
	// 上传对象,这是一个原子操作
    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {  
        self.put_opts(location, payload, PutOptions::default())  
            .await  
    }  
  
	// 与put相同,不过可以传递opts
    async fn put_opts(  
        &self,  
        location: &Path,  
        payload: PutPayload,  
        opts: PutOptions,  
    ) -> Result<PutResult>;  
  
	// 分片上传
    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {  
        self.put_multipart_opts(location, PutMultipartOpts::default())  
            .await  
    }  
  
    // 分片上传的opt版本
    async fn put_multipart_opts(  
        &self,  
        location: &Path,  
        opts: PutMultipartOpts,  
    ) -> Result<Box<dyn MultipartUpload>>;  
  
	// 读取对象
    async fn get(&self, location: &Path) -> Result<GetResult> {  
        self.get_opts(location, GetOptions::default()).await  
    }  
  
    // get的opt版本
    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;  
  
    // 读取range范围的对象内容
    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {  
        let options = GetOptions {  
            range: Some(range.into()),  
            ..Default::default()  
        };  
        self.get_opts(location, options).await?.bytes().await  
    }  
  
	// 多range读取版本
    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {  
        coalesce_ranges(  
            ranges,  
            |range| self.get_range(location, range),  
            OBJECT_STORE_COALESCE_DEFAULT,  
        )  
        .await  
    }  
  
    // head方法,用于获取对象的元数据信息
    async fn head(&self, location: &Path) -> Result<ObjectMeta> {  
        let options = GetOptions {  
            head: true,  
            ..Default::default()  
        };  
        Ok(self.get_opts(location, options).await?.meta)  
    }  
  
    // 删除对对象文件
    async fn delete(&self, location: &Path) -> Result<()>;  
  
	// 批量删除对象文件
    fn delete_stream<'a>(  
        &'a self,  
        locations: BoxStream<'a, Result<Path>>,  
    ) -> BoxStream<'a, Result<Path>> {  
        locations  
            .map(|location| async {  
                let location = location?;  
                self.delete(&location).await?;  
                Ok(location)  
            })  
            .buffered(10)  
            .boxed()  
    }  
 
	// 按照prefix列出文件
	fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
 
	// 可以跳过前offset的list版本
    fn list_with_offset(  
        &self,  
        prefix: Option<&Path>,  
        offset: &Path,  
    ) -> BoxStream<'static, Result<ObjectMeta>> {  
        let offset = offset.clone();  
        self.list(prefix)  
            .try_filter(move |f| futures::future::ready(f.location > offset))  
            .boxed()  
    }  
 
	// 以分隔符结束的list的版本,也不会递归
	async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
 
	// 对象文件拷贝
	async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
 
	// 重命名,其实就是拷贝+删除就对象
    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {  
        self.copy(from, to).await?;  
        self.delete(from).await  
    }  
  
 
    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {  
        self.copy_if_not_exists(from, to).await?;  
        self.delete(from).await  
    }  
}

究其底层实现,也就是转为http请求而已。其他云厂商的对象存储同理。

#[derive(Debug, Clone)]  
pub struct AmazonS3 {  
    client: Arc<S3Client>,  
}
 
#[derive(Debug)]  
pub(crate) struct S3Client {  
    pub config: S3Config,  
    pub client: HttpClient,  
}

至于内存和本地文件的实现则更加简单了,就是内存数据结构的修改以及本地文件的读写了。

Datafusion也是完全使用object_store这个crate来对接存储的,也就编写测试自己实现了需要的object_store。 因此就不在object_store这一层投入过多了,需要的话就直接了解object_store的实现即可。