其实数据源就是Table,即TableProvider的实现。在DF中,到了Table这一层,就不再需要关注底层存储的细节了。Catalog和Schema只是用来隔离Table的一种手段。

但是我们要为DF扩展数据源的话,那么了解数据源的实现方式还是很有必要的。

ListingTable是Datafusion提供的一种表格式。

Datafusion关注的Table是一个逻辑概念,ListingTable作为一种实现方式,采用了一种组织方式来将一组文件组织成一个表。那么就有了下面的问题:

  • 一张表是由哪些文件组织起来的?
  • 文件支持哪些格式?
  • 文件支持从哪些存储中读取?

用DF的文件数据源实现ListingTable来看如何考虑这几个问题?

  • 定义了目录和文件的层级关系,以将多个文件和目录组织成为一张表
  • 将文件格式抽象为FileFormat trait以便扩展
  • 文件的读取称为IO,DF同样将此抽象为了ObjectStore trait,提供统一的IO接口,可以随意对接到任意的存储服务如S3, OSS等等。

即一个ListingTable是通过 Files(有多个文件) FileFormat(支持多种格式) ObjectStore(可接入多种存储服务)。通过IO、文件格式、文件自身的抽象,使得Datafusion可以任意扩展不同的存储、文件格式、存储协议的支持,并且在Datafusion项目中具有统一的上层业务抽象,即Table。

内置文件数据源ListingTable

通过DynamicFileCatalog DynamicFileCatalogProvider DynamicFileSchemaProvider从CatalogList Catalog Schema,将UrlTableFactory向下传递,这样在就可以在schema中获取表的时候增加文件读写的能力,从而为每个catalog下的每个schema都扩展了读取文件的能力。在DynamicFileSchemaProvider的SchemaProvider的实现当中明确体现出来了

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
    async fn table(
        &self,
        name: &str,
    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
        if let Some(table) = self.inner.table(name).await? {
            return Ok(Some(table));
        };
 
        self.factory.try_new(name).await
    }
}

那么接下来就是如何实现文件读写的能力了。 UrlTableFactory只有一个实现,DynamicListTableFactory。其功能就是从指定的文件路径当中读取出一张表。文件路径支持本地文件系统路径和网络路径两部分,最终都统一解析为ListingTableUrl来表示

pub struct ListingTableUrl {
    /// A URL that identifies a file or directory to list files from
    url: Url,
    /// The path prefix
    prefix: Path,
    /// An optional glob expression used to filter files
    glob: Option<Pattern>,
}

其中url是符合标准url路径的,例如file:///foo/bars3://bucket/foo/bar。而prefix和glob都是由url解析获得,一个是确定的前缀,一个是用来过滤文件的条件。

有了ListingTableUrl中携带的文件信息,以及session中的信息,最终可以创建出来ListingTable。这个过程像是一步步推导的过程,不过根据已知信息推断出额外的配置,最终获得了表信息。

#[async_trait]
impl UrlTableFactory for DynamicListTableFactory {
    async fn try_new(&self, url: &str) -> Result<Option<Arc<dyn TableProvider>>> {
        let Ok(table_url) = ListingTableUrl::parse(url) else {
            return Ok(None);
        };
 
        let state = &self
            .session_store()
            .get_session()
            .upgrade()
            .and_then(|session| {
                session
                    .read()
                    .as_any()
                    .downcast_ref::<SessionState>()
                    .cloned()
            })
            .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;
 
        match ListingTableConfig::new(table_url.clone())
            .infer_options(state)
            .await
        {
            Ok(cfg) => {
                let cfg = cfg
                    .infer_partitions_from_path(state)
                    .await?
                    .infer_schema(state)
                    .await?;
                ListingTable::try_new(cfg)
                    .map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
            }
            Err(_) => Ok(None),
        }
    }
}

ListingTable描述了如何将文件组织成为一个表的

/// table1  
/// ├── file1.parquet  
/// └── file2.parquet
/// or 
/// table2  
/// ├── date=2024-06-01  
/// │ ├── file3.parquet  
/// │ └── file4.parquet  
/// └── date=2024-06-02  
/// └── file5.parquet
#[derive(Debug)]
pub struct ListingTable {
    table_paths: Vec<ListingTableUrl>,
    /// File fields only
    file_schema: SchemaRef,
    /// File fields + partition columns
    table_schema: SchemaRef,
    options: ListingOptions,
    definition: Option<String>,
    collected_statistics: FileStatisticsCache,
    constraints: Constraints,
    column_defaults: HashMap<String, Expr>,
}

实现了TableProvider这个trait。能够从中获取表的结构等元数据,最重要的就是对表的读和写操作。即scaninsert_into方法,返回到上层的就是一个ExecutionPlan。

ListingTable的读

ListingTable是由一个文件树中的所有文件组织成的一张表,虽然这个文件树也可以只有一个文件。进行读数无非就是读取文件的内容而已,最重要的也不过是根据过滤条件来过滤掉不需要读取的文件甚至不需要读取的文件部分。

如下便是方法的签名,值得注意的是projection,filters,以及limit都是让文件读取能够提前结束的手段。

async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>>;

下面便具体来看读表的流程。返回的ExecutionPlan关注其execute方法,获取一个可以读取RecordBatch的流失对象

首先,ListingTable是支持分区的,因此第一步当然是尝试过滤掉不必要读的分区。

// 从表的schema信息中提取中分区字典的信息
let table_partition_cols = self
	.options
	.table_partition_cols
	.iter()
	.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
	.collect::<Result<Vec<_>>>()?;
// 从分区字段中再提取出字段名
let table_partition_col_names = table_partition_cols
	.iter()
	.map(|field| field.name().as_str())
	.collect::<Vec<_>>();
// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
// 将filter划分为可以用于分区过滤的,以及不涉及分区过滤的
let (partition_filters, filters): (Vec<_>, Vec<_>) =
	filters.iter().cloned().partition(|filter| {
		can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
	});

接着当然就是使用分区过滤的filter过滤出需要读取的分区

let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
// 过滤出需要读取的分区以及分区下的文件,partitioned_file_lists是一个二维数组,第一维是分区,第二维就是分区下的文件了
let (mut partitioned_file_lists, statistics) = self
	.list_files_for_scan(session_state, &partition_filters, limit)
	.await?;
 
// if no files need to be read, return an `EmptyExec`
// 分区为空,说明没有数据需要读取,直接返回EmptyExec即可
if partitioned_file_lists.is_empty() {
	let projected_schema = project_schema(&self.schema(), projection)?;
	return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

在裁剪分区收集分区文件的过程中,根据配置可能还会读取文件的元信息已获取统计信息。如果已知文件是已经排序过了,那么接下来可以将文件现在的分组调整为按照最大最小值分组。先将文件按照最小值排序,然后将文件分组为无重叠的多个组。类似于

[1, 3], [2,4], [3, 5], [7,9], [8,9], [10,11]
分组为了
[1,5], [7,9], [10,11]

尝试借助文件已经有序的基础,对文件的读取进行优化

let output_ordering = self.try_create_output_ordering()?;
match state
	.config_options()
	.execution
	.split_file_groups_by_statistics
	.then(|| {
		output_ordering.first().map(|output_ordering| {
			FileScanConfig::split_groups_by_statistics(
				&self.table_schema,
				&partitioned_file_lists,
				output_ordering,
			)
		})
	})
	.flatten()
{
	Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
	Some(Ok(new_groups)) => {
		if new_groups.len() <= self.options.target_partitions {
			partitioned_file_lists = new_groups;
		} else {
			log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
		}
	}
	None => {} // no ordering required
};

接着将filter转为PhysicalExpr,就可以根据现有的文件分组、过滤条件,projection、limit,统计信息来创建ExecutionPlan

let filters = conjunction(filters.to_vec())
	.map(|expr| -> Result<_> {
		// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
		let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
		let filters = create_physical_expr(
			&expr,
			&table_df_schema,
			state.execution_props(),
		)?;
		Ok(Some(filters))
	})
	.unwrap_or(Ok(None))?;
 
let Some(object_store_url) =
	self.table_paths.first().map(ListingTableUrl::object_store)
else {
	return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
 
// create the execution plan
self.options
	.format
	.create_physical_plan(
		session_state,
		FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
			.with_file_groups(partitioned_file_lists)
			.with_statistics(statistics)
			.with_projection(projection.cloned())
			.with_limit(limit)
			.with_output_ordering(output_ordering)
			.with_table_partition_cols(table_partition_cols),
		filters.as_ref(),
	)
	.await

更具体的文件读取,就需要到具体的文件格式当中了。这里就不再进一步了。

ListingTable的写

async fn insert_into(
	&self,
	state: &dyn Session,
	input: Arc<dyn ExecutionPlan>,
	insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> 

从方法的签名就可以看出来,写数就是从一个数据来源input,按照某种方式写入到指定位置的过程。

写入的过程就是要确定写入的文件,以及将数据分发到对应的文件当中去。

其逻辑如下

  • 检查input的schema和ListTable的schema一致。
  • 裁剪分区,也就是获得分区文件列表
  • 然后将写数所需要的信息都封装为FileSinkConfig结构体。知道了写数的配置项、有了数据来源,接下来就是执行写数逻辑了
    async fn insert_into(
        &self,
        state: &dyn Session,
        input: Arc<dyn ExecutionPlan>,
        insert_op: InsertOp,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // 校验schema是否匹配
        if !self
            .schema()
            .logically_equivalent_names_and_types(&input.schema())
        {
            // Return an error if schema of the input query does not match with the table schema.
            return plan_err!(
                "Inserting query must have the same schema with the table. \
                Expected: {:?}, got: {:?}",
                self.schema()
                    .fields()
                    .iter()
                    .map(|field| field.data_type())
                    .collect::<Vec<_>>(),
                input
                    .schema()
                    .fields()
                    .iter()
                    .map(|field| field.data_type())
                    .collect::<Vec<_>>()
            );
        }
 
        let table_path = &self.table_paths()[0];
        if !table_path.is_collection() {
            return plan_err!(
                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
            );
        }
 
        // Get the object store for the table path.
        let store = state.runtime_env().object_store(table_path)?;
 
        // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
        // 裁剪并获取分区文件
        let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
        let file_list_stream = pruned_partition_list(
            session_state,
            store.as_ref(),
            table_path,
            &[],
            &self.options.file_extension,
            &self.options.table_partition_cols,
        )
        .await?;
 
        let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
        let keep_partition_by_columns =
            state.config_options().execution.keep_partition_by_columns;
 
        // Sink related option, apart from format
        // 写入配置
        let config = FileSinkConfig {
            object_store_url: self.table_paths()[0].object_store(),
            table_paths: self.table_paths().clone(),
            file_groups,
            output_schema: self.schema(),
            table_partition_cols: self.options.table_partition_cols.clone(),
            insert_op,
            keep_partition_by_columns,
        };
 
        let order_requirements = if !self.options().file_sort_order.is_empty() {
            // Multiple sort orders in outer vec are equivalent, so we pass only the first one
            let ordering = self
                .try_create_output_ordering()?
                .first()
                .ok_or(DataFusionError::Internal(
                    "Expected ListingTable to have a sort order, but none found!".into(),
                ))?
                .clone();
            // Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
            Some(LexRequirement::new(
                ordering
                    .into_iter()
                    .map(PhysicalSortRequirement::from)
                    .collect::<Vec<_>>(),
            ))
        } else {
            None
        };
		// 创建执行计划,等待执行写数
        self.options()
            .format
            .create_writer_physical_plan(input, session_state, config, order_requirements)
            .await
    }

File一节当中了解单个逻辑文件的读写,ListingTable就在再此基础上再将多个文件组织成表,从ListingTable的读和写来看,最终都是调用FileFormat的读和写方法转为ExecutionPlan的。