dafafusion将Table抽象为TableProvider trait。 然后Table的集合抽象为Schema,使用SchemaProvider trait表达。 Schema的集合概念则是Catalog,使用CatalogProvider trait表达。

这三层结构可以这样理解,Catalog是一种物理存储的类型,Schema则是在一个物理存储上的逻辑隔离的概念,Table则是一个Schema下的唯一标识。这三层结构基本上是数据存储系统的标准了。

也就是通过Catalog.Schema.Table可以在datafusion中确认任意一张表,换言之,只要提供了对应的CatalogProvider trait, SchemaProvide trait, TableProvider trait的接口,就可以介入任意一张表到datafusion当中了。

读取一张表的状态信息,也就是访问Catalog.Schema.Table三层结构的状态信息叫做Session,抽象为了Session trait,用于记录上下文信息。

catalog

catalog提供管理schema的能力,以内存实现的catalog为列

// 将schema记录在一个内存当中的哈希表中,然后实现CatalogProvider trait提供管理schema的能力
pub struct MemoryCatalogProvider {
    schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
 
impl CatalogProvider for MemoryCatalogProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }
	// 返回所有schema名
    fn schema_names(&self) -> Vec<String> {
        self.schemas.iter().map(|s| s.key().clone()).collect()
    }
	// 返回name对应的schema
    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
        self.schemas.get(name).map(|s| s.value().clone())
    }
	// 注册一个schema
    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
        Ok(self.schemas.insert(name.into(), schema))
    }
	// 取消注册schema
    fn deregister_schema(
        &self,
        name: &str,
        cascade: bool,
    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
        if let Some(schema) = self.schema(name) {
            let table_names = schema.table_names();
            match (table_names.is_empty(), cascade) {
                (true, _) | (false, true) => {
                    let (_, removed) = self.schemas.remove(name).unwrap();
                    Ok(Some(removed))
                }
                (false, false) => exec_err!(
                    "Cannot drop schema {} because other tables depend on it: {}",
                    name,
                    itertools::join(table_names.iter(), ", ")
                ),
            }
        } else {
            Ok(None)
        }
    }
}

schema

schema需要实现SchemaProvider trait,提供管理Table的能力。与catalog类似,看一下内存实现的schema,将Table在内存中保存。

pub struct MemorySchemaProvider {
    tables: DashMap<String, Arc<dyn TableProvider>>,
}
 
impl SchemaProvider for MemorySchemaProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }
	// 获取所有表名
    fn table_names(&self) -> Vec<String> {
        self.tables
            .iter()
            .map(|table| table.key().clone())
            .collect()
    }
	// 返回name指定的表
    async fn table(
        &self,
        name: &str,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
        Ok(self.tables.get(name).map(|table| table.value().clone()))
    }
	// 注册表
    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
        if self.table_exist(name.as_str()) {
            return exec_err!("The table {name} already exists");
        }
        Ok(self.tables.insert(name, table))
    }
	// 取消注册表
    fn deregister_table(
        &self,
        name: &str,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
        Ok(self.tables.remove(name).map(|(_, table)| table))
    }
	// 判断表是否存在
    fn table_exist(&self, name: &str) -> bool {
        self.tables.contains_key(name)
    }
}

Table

datafusion中的表需要实现TableProvider trait。还是以内存表为例进行说明

// 将表的元数据信息以及数据都保存在内存当中
pub struct MemTable {
    schema: SchemaRef,
    pub(crate) batches: Vec<PartitionData>,
    constraints: Constraints,
    column_defaults: HashMap<String, Expr>,
    /// Optional pre-known sort order(s). Must be `SortExpr`s.
    /// inserting data into this table removes the order
    pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}
 
impl TableProvider for MemTable {
    fn as_any(&self) -> &dyn Any {
        self
    }
	// 返回表的schema结构
    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }
	// 表的约束信息
    fn constraints(&self) -> Option<&Constraints> {
        Some(&self.constraints)
    }
	// 表的类型
    fn table_type(&self) -> TableType {
        TableType::Base
    }
	// 读取表,也就是返回一个读取表的执行计划
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        _filters: &[Expr],
        _limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let mut partitions = vec![];
        for arc_inner_vec in self.batches.iter() {
            let inner_vec = arc_inner_vec.read().await;
            partitions.push(inner_vec.clone())
        }
 
        let mut exec =
            MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;
 
        let show_sizes = state.config_options().explain.show_sizes;
        exec = exec.with_show_sizes(show_sizes);
 
        // add sort information if present
        let sort_order = self.sort_order.lock();
        if !sort_order.is_empty() {
            let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
 
            let file_sort_order = sort_order
                .iter()
                .map(|sort_exprs| {
                    create_physical_sort_exprs(
                        sort_exprs,
                        &df_schema,
                        state.execution_props(),
                    )
                })
                .collect::<Result<Vec<_>>>()?;
            exec = exec.try_with_sort_information(file_sort_order)?;
        }
 
        Ok(Arc::new(exec))
    }
 
    /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
    ///
    /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
    ///
    /// # Arguments
    ///
    /// * `state` - The [`SessionState`] containing the context for executing the plan.
    /// * `input` - The [`ExecutionPlan`] to execute and insert.
    ///
    /// # Returns
    ///
    /// * A plan that returns the number of rows written.
    // 同样返回一个执行计划,不过这个执行计划是从input读数,写入到表中的
    async fn insert_into(
        &self,
        _state: &dyn Session,
        input: Arc<dyn ExecutionPlan>,
        insert_op: InsertOp,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // If we are inserting into the table, any sort order may be messed up so reset it here
        *self.sort_order.lock() = vec![];
 
        // Create a physical plan from the logical plan.
        // Check that the schema of the plan matches the schema of this table.
        if !self
            .schema()
            .logically_equivalent_names_and_types(&input.schema())
        {
            return plan_err!(
                "Inserting query must have the same schema with the table. \
                Expected: {:?}, got: {:?}",
                self.schema()
                    .fields()
                    .iter()
                    .map(|field| field.data_type())
                    .collect::<Vec<_>>(),
                input
                    .schema()
                    .fields()
                    .iter()
                    .map(|field| field.data_type())
                    .collect::<Vec<_>>()
            );
        }
        if insert_op != InsertOp::Append {
            return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
        }
        let sink = Arc::new(MemSink::new(self.batches.clone()));
        Ok(Arc::new(DataSinkExec::new(
            input,
            sink,
            self.schema.clone(),
            None,
        )))
    }
	// 获取column的默认值
    fn get_column_default(&self, column: &str) -> Option<&Expr> {
        self.column_defaults.get(column)
    }
}

可以知道,通过三层结构既能够唯一确定一张表,同时还为datafusion扩展数据源提供了良好的能力,这也是现在数据领域最常见的Catalog服务。

具体到Table上了,可以看到只关注表的结构信息,以及向上层提供从表中读取数据和写数据的scan与insert_into方法就足够了。任何复杂的计算最终都要落到这两点。

dynamic_file Catalog实现

datafusion提供的DynamicFileCatalog是一个统一的包装入口,封装了在DataFusion里的统一入口,有为每一种Catalog扩展了Datafusion自己提供的表格式。

// Catalog的包装
struct DynamicFileCatalogProvider {
    /// 这里是用户自己注册的Catalog
    inner: Arc<dyn CatalogProvider>,
    /// 这里是Datafusion为此Catalog扩展的UrlTable表支持
    factory: Arc<dyn UrlTableFactory>,
}
 
impl CatalogProvider for DynamicFileCatalogProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }
 
    fn schema_names(&self) -> Vec<String> {
        self.inner.schema_names()
    }
 
    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
        self.inner.schema(name).map(|schema| {
            Arc::new(DynamicFileSchemaProvider::new(
                schema,
                Arc::clone(&self.factory),
            )) as _
        })
    }
 
    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
        self.inner.register_schema(name, schema)
    }
}

在datafusion当中是支持接入多种存储系统的,因此Catalog是可以有多个的,因此CatalogList才是实际上的第一层入口,每个Catalog就是一个存储系统的数据源接入入口。

// Catalog list包装
pub struct DynamicFileCatalog {
    /// The inner catalog provider list
    inner: Arc<dyn CatalogProviderList>,
    /// The factory that can create a table provider from the file path
    factory: Arc<dyn UrlTableFactory>,
}
 
impl CatalogProviderList for DynamicFileCatalog {
    fn as_any(&self) -> &dyn Any {
        self
    }
 
    fn register_catalog(
        &self,
        name: String,
        catalog: Arc<dyn CatalogProvider>,
    ) -> Option<Arc<dyn CatalogProvider>> {
        self.inner.register_catalog(name, catalog)
    }
 
    fn catalog_names(&self) -> Vec<String> {
        self.inner.catalog_names()
    }
 
    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
        self.inner.catalog(name).map(|catalog| {
            Arc::new(DynamicFileCatalogProvider::new(
                catalog,
                Arc::clone(&self.factory),
            )) as _
        })
    }
}

接下来就是schema

pub struct DynamicFileSchemaProvider {
    /// The inner schema provider
    inner: Arc<dyn SchemaProvider>,
    /// The factory that can create a table provider from the file path
    factory: Arc<dyn UrlTableFactory>,
}
 
impl SchemaProvider for DynamicFileSchemaProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }
 
    fn table_names(&self) -> Vec<String> {
        self.inner.table_names()
    }
 
    async fn table(
        &self,
        name: &str,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
        if let Some(table) = self.inner.table(name).await? {
            return Ok(Some(table));
        };
		// 可以看到,当注册的schema找不到表,就尝试使用扩展的UrlTable表的的方式进行查找
        self.factory.try_new(name).await
    }
 
    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
        self.inner.register_table(name, table)
    }
 
    fn deregister_table(
        &self,
        name: &str,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
        self.inner.deregister_table(name)
    }
 
    fn table_exist(&self, name: &str) -> bool {
        self.inner.table_exist(name)
    }
}

UrlTableFactory的默认实现即为DynamicListTableFactory,对接默认的数据源ListingTable,是Datafusion支持的一种表格式,可以为所有接入Datafusion的数据源进行扩展支持。

impl UrlTableFactory for DynamicListTableFactory {
    async fn try_new(&self, url: &str) -> Result<Option<Arc<dyn TableProvider>>> {
        let Ok(table_url) = ListingTableUrl::parse(url) else {
            return Ok(None);
        };
 
        let state = &self
            .session_store()
            .get_session()
            .upgrade()
            .and_then(|session| {
                session
                    .read()
                    .as_any()
                    .downcast_ref::<SessionState>()
                    .cloned()
            })
            .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;
 
        match ListingTableConfig::new(table_url.clone())
            .infer_options(state)
            .await
        {
            Ok(cfg) => {
                let cfg = cfg
                    .infer_partitions_from_path(state)
                    .await?
                    .infer_schema(state)
                    .await?;
                ListingTable::try_new(cfg)
                    .map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
            }
            Err(_) => Ok(None),
        }
    }
}

执行过程中的状态信息以及入口等都存储在Session中,实现为DF中的实现为SessionState

pub struct SessionState {
	......
    /// Session当中CatalogProviderList作为所有数据渊的入口
    catalog_list: Arc<dyn CatalogProviderList>,
	......
}

给予SessionState,DF又提供了SessionContext,类似于Spark的session一样,作为DF引擎对外的接口

pub struct SessionContext {
    /// UUID for the session
    session_id: String,
    /// Session start time
    session_start_time: DateTime<Utc>,
    /// Shared session state for the session
    state: Arc<RwLock<SessionState>>,
}