dafafusion将Table抽象为TableProvider trait。 然后Table的集合抽象为Schema,使用SchemaProvider trait表达。 Schema的集合概念则是Catalog,使用CatalogProvider trait表达。
这三层结构可以这样理解,Catalog是一种物理存储的类型,Schema则是在一个物理存储上的逻辑隔离的概念,Table则是一个Schema下的唯一标识。这三层结构基本上是数据存储系统的标准了。
也就是通过Catalog.Schema.Table可以在datafusion中确认任意一张表,换言之,只要提供了对应的CatalogProvider trait, SchemaProvide trait, TableProvider trait的接口,就可以介入任意一张表到datafusion当中了。
读取一张表的状态信息,也就是访问Catalog.Schema.Table三层结构的状态信息叫做Session,抽象为了Session trait,用于记录上下文信息。
catalog
catalog提供管理schema的能力,以内存实现的catalog为列
// 将schema记录在一个内存当中的哈希表中,然后实现CatalogProvider trait提供管理schema的能力
pub struct MemoryCatalogProvider {
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
// 返回所有schema名
fn schema_names(&self) -> Vec<String> {
self.schemas.iter().map(|s| s.key().clone()).collect()
}
// 返回name对应的schema
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).map(|s| s.value().clone())
}
// 注册一个schema
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.into(), schema))
}
// 取消注册schema
fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
if let Some(schema) = self.schema(name) {
let table_names = schema.table_names();
match (table_names.is_empty(), cascade) {
(true, _) | (false, true) => {
let (_, removed) = self.schemas.remove(name).unwrap();
Ok(Some(removed))
}
(false, false) => exec_err!(
"Cannot drop schema {} because other tables depend on it: {}",
name,
itertools::join(table_names.iter(), ", ")
),
}
} else {
Ok(None)
}
}
}schema
schema需要实现SchemaProvider trait,提供管理Table的能力。与catalog类似,看一下内存实现的schema,将Table在内存中保存。
pub struct MemorySchemaProvider {
tables: DashMap<String, Arc<dyn TableProvider>>,
}
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
// 获取所有表名
fn table_names(&self) -> Vec<String> {
self.tables
.iter()
.map(|table| table.key().clone())
.collect()
}
// 返回name指定的表
async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.tables.get(name).map(|table| table.value().clone()))
}
// 注册表
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if self.table_exist(name.as_str()) {
return exec_err!("The table {name} already exists");
}
Ok(self.tables.insert(name, table))
}
// 取消注册表
fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.remove(name).map(|(_, table)| table))
}
// 判断表是否存在
fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}Table
datafusion中的表需要实现TableProvider trait。还是以内存表为例进行说明
// 将表的元数据信息以及数据都保存在内存当中
pub struct MemTable {
schema: SchemaRef,
pub(crate) batches: Vec<PartitionData>,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
/// inserting data into this table removes the order
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}
impl TableProvider for MemTable {
fn as_any(&self) -> &dyn Any {
self
}
// 返回表的schema结构
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
// 表的约束信息
fn constraints(&self) -> Option<&Constraints> {
Some(&self.constraints)
}
// 表的类型
fn table_type(&self) -> TableType {
TableType::Base
}
// 读取表,也就是返回一个读取表的执行计划
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut partitions = vec![];
for arc_inner_vec in self.batches.iter() {
let inner_vec = arc_inner_vec.read().await;
partitions.push(inner_vec.clone())
}
let mut exec =
MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;
let show_sizes = state.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);
// add sort information if present
let sort_order = self.sort_order.lock();
if !sort_order.is_empty() {
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
let file_sort_order = sort_order
.iter()
.map(|sort_exprs| {
create_physical_sort_exprs(
sort_exprs,
&df_schema,
state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
exec = exec.try_with_sort_information(file_sort_order)?;
}
Ok(Arc::new(exec))
}
/// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
///
/// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
///
/// # Arguments
///
/// * `state` - The [`SessionState`] containing the context for executing the plan.
/// * `input` - The [`ExecutionPlan`] to execute and insert.
///
/// # Returns
///
/// * A plan that returns the number of rows written.
// 同样返回一个执行计划,不过这个执行计划是从input读数,写入到表中的
async fn insert_into(
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// If we are inserting into the table, any sort order may be messed up so reset it here
*self.sort_order.lock() = vec![];
// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
return plan_err!(
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}
if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(DataSinkExec::new(
input,
sink,
self.schema.clone(),
None,
)))
}
// 获取column的默认值
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
}可以知道,通过三层结构既能够唯一确定一张表,同时还为datafusion扩展数据源提供了良好的能力,这也是现在数据领域最常见的Catalog服务。
具体到Table上了,可以看到只关注表的结构信息,以及向上层提供从表中读取数据和写数据的scan与insert_into方法就足够了。任何复杂的计算最终都要落到这两点。
dynamic_file Catalog实现
datafusion提供的DynamicFileCatalog是一个统一的包装入口,封装了在DataFusion里的统一入口,有为每一种Catalog扩展了Datafusion自己提供的表格式。
// Catalog的包装
struct DynamicFileCatalogProvider {
/// 这里是用户自己注册的Catalog
inner: Arc<dyn CatalogProvider>,
/// 这里是Datafusion为此Catalog扩展的UrlTable表支持
factory: Arc<dyn UrlTableFactory>,
}
impl CatalogProvider for DynamicFileCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name).map(|schema| {
Arc::new(DynamicFileSchemaProvider::new(
schema,
Arc::clone(&self.factory),
)) as _
})
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.register_schema(name, schema)
}
}在datafusion当中是支持接入多种存储系统的,因此Catalog是可以有多个的,因此CatalogList才是实际上的第一层入口,每个Catalog就是一个存储系统的数据源接入入口。
// Catalog list包装
pub struct DynamicFileCatalog {
/// The inner catalog provider list
inner: Arc<dyn CatalogProviderList>,
/// The factory that can create a table provider from the file path
factory: Arc<dyn UrlTableFactory>,
}
impl CatalogProviderList for DynamicFileCatalog {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.inner.register_catalog(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
self.inner.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicFileCatalogProvider::new(
catalog,
Arc::clone(&self.factory),
)) as _
})
}
}接下来就是schema
pub struct DynamicFileSchemaProvider {
/// The inner schema provider
inner: Arc<dyn SchemaProvider>,
/// The factory that can create a table provider from the file path
factory: Arc<dyn UrlTableFactory>,
}
impl SchemaProvider for DynamicFileSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}
async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if let Some(table) = self.inner.table(name).await? {
return Ok(Some(table));
};
// 可以看到,当注册的schema找不到表,就尝试使用扩展的UrlTable表的的方式进行查找
self.factory.try_new(name).await
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.inner.register_table(name, table)
}
fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
}
fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
}
}UrlTableFactory的默认实现即为DynamicListTableFactory,对接默认的数据源ListingTable,是Datafusion支持的一种表格式,可以为所有接入Datafusion的数据源进行扩展支持。
impl UrlTableFactory for DynamicListTableFactory {
async fn try_new(&self, url: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let Ok(table_url) = ListingTableUrl::parse(url) else {
return Ok(None);
};
let state = &self
.session_store()
.get_session()
.upgrade()
.and_then(|session| {
session
.read()
.as_any()
.downcast_ref::<SessionState>()
.cloned()
})
.ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;
match ListingTableConfig::new(table_url.clone())
.infer_options(state)
.await
{
Ok(cfg) => {
let cfg = cfg
.infer_partitions_from_path(state)
.await?
.infer_schema(state)
.await?;
ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
}
Err(_) => Ok(None),
}
}
}执行过程中的状态信息以及入口等都存储在Session中,实现为DF中的实现为SessionState
pub struct SessionState {
......
/// Session当中CatalogProviderList作为所有数据渊的入口
catalog_list: Arc<dyn CatalogProviderList>,
......
}给予SessionState,DF又提供了SessionContext,类似于Spark的session一样,作为DF引擎对外的接口
pub struct SessionContext {
/// UUID for the session
session_id: String,
/// Session start time
session_start_time: DateTime<Utc>,
/// Shared session state for the session
state: Arc<RwLock<SessionState>>,
}