LogicalPlan是datafusion定义的计算描述的结构。这个是非常重要的核心组件。在此基础上,只要任何接口能够转化为LogicalPlan,那么就可以完成对Datafusion的集成。
Datafusion内置支持的sql和dataframe接口就是如此。初次之外proto也是一种描述,可以方便地将datafusion作为以一个独立服务启动。datafusion下的proto,sql都是如此。dataframe则是直接放在了核心模块当中了,datafusion实现也是直接转换为了对LogicalPlan的操作
pub struct DataFrame {
// Box the (large) SessionState to reduce the size of DataFrame on the stack
session_state: Box<SessionState>,
plan: LogicalPlan,
}所以LogicalPlan承担了承上启下的功能,以此为脉络能够更加清晰地梳理datafusion。
LogicalPlan是一个树形结构,在Rust中使用enum表达,在datafusion/expr/src/logical_plan/plan.rs文件中可以看到如下定义
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum LogicalPlan {
/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
Projection(Projection),
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
///
/// Semantically, `<predicate>` is evaluated for each row of the
/// input; If the value of `<predicate>` is true, the input row is
/// passed to the output. If the value of `<predicate>` is false
/// (or null), the row is discarded.
Filter(Filter),
/// Windows input based on a set of window spec and window
/// function (e.g. SUM or RANK). This is used to implement SQL
/// window functions, and the `OVER` clause.
Window(Window),
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM). This is used to implement SQL aggregates
/// and `GROUP BY`.
Aggregate(Aggregate),
/// Sorts its input according to a list of sort expressions. This
/// is used to implement SQL `ORDER BY`
Sort(Sort),
/// Join two logical plans on one or more join columns.
/// This is used to implement SQL `JOIN`
Join(Join),
/// Repartitions the input based on a partitioning scheme. This is
/// used to add parallelism and is sometimes referred to as an
/// "exchange" operator in other systems
Repartition(Repartition),
/// Union multiple inputs with the same schema into a single
/// output stream. This is used to implement SQL `UNION [ALL]` and
/// `INTERSECT [ALL]`.
Union(Union),
/// Produces rows from a [`TableSource`], used to implement SQL
/// `FROM` tables or views.
TableScan(TableScan),
/// Produces no rows: An empty relation with an empty schema that
/// produces 0 or 1 row. This is used to implement SQL `SELECT`
/// that has no values in the `FROM` clause.
EmptyRelation(EmptyRelation),
/// Produces the output of running another query. This is used to
/// implement SQL subqueries
Subquery(Subquery),
/// Aliased relation provides, or changes, the name of a relation.
SubqueryAlias(SubqueryAlias),
/// Skip some number of rows, and then fetch some number of rows.
Limit(Limit),
/// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
Statement(Statement),
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details. This is used to implement SQL such as
/// `VALUES (1, 2), (3, 4)`
Values(Values),
/// Produces a relation with string representations of
/// various parts of the plan. This is used to implement SQL `EXPLAIN`.
Explain(Explain),
/// Runs the input, and prints annotated physical plan as a string
/// with execution metric. This is used to implement SQL
/// `EXPLAIN ANALYZE`.
Analyze(Analyze),
/// Extension operator defined outside of DataFusion. This is used
/// to extend DataFusion with custom relational operations that
Extension(Extension),
/// Remove duplicate rows from the input. This is used to
/// implement SQL `SELECT DISTINCT ...`.
Distinct(Distinct),
/// Prepare a statement and find any bind parameters
/// (e.g. `?`). This is used to implement SQL-prepared statements.
Prepare(Prepare),
/// Data Manipulation Language (DML): Insert / Update / Delete
Dml(DmlStatement),
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
Ddl(DdlStatement),
/// `COPY TO` for writing plan results to files
Copy(CopyTo),
/// Describe the schema of the table. This is used to implement the
/// SQL `DESCRIBE` command from MySQL.
DescribeTable(DescribeTable),
/// Unnest a column that contains a nested list type such as an
/// ARRAY. This is used to implement SQL `UNNEST`
Unnest(Unnest),
/// A variadic query (e.g. "Recursive CTEs")
RecursiveQuery(RecursiveQuery),
}每一个枚举值都是一种节点类型。通过这些节点之间的组合嵌套基本上就可以满足描述任意的计算过程。当然,除了计算之外,还有一些特殊的节点,例如
- DdlStatement
- DmlStatement
- Statement
- CopyTo
- Extension 这些用来定义结构、扩展、特殊操作的节点。
第一步就了解一个各个节点的类型。来看是如何利用这些类型来描述任意的复杂计算的。
LogicalPlanNode介绍
Projection Node
用于求取任意表达式列表的值。也就是SELCT语句的字段列表,这里的每个字段实际上都是一个表达式。
pub struct Projection {
/// The list of expressions
pub expr: Vec<Expr>,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The schema description of the output
pub schema: DFSchemaRef,
}在input之上,对expr表达式列表中的所有表达式求值。最终输出的模式与schema匹配。
Filter Node
也就是where子句。过滤掉输入中不匹配表达式的行
pub struct Filter {
/// The predicate expression, which must have Boolean type.
pub predicate: Expr,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The flag to indicate if the filter is a having clause
pub having: bool,
}Window Node
window是窗口函数。其实也是基于输入的数据用表达式求值。不过输入的数据都是窗口相关的数据。普通的函数是一行值进行计算,而窗口函数则是一组相关的值进行计算
pub struct Window {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The window function expression
pub window_expr: Vec<Expr>,
/// The schema description of the window output
pub schema: DFSchemaRef,
}Aggregate Node
聚合节点则是包括分组的键以及聚合表达式两部分。对同一分组中的行做聚合计算。两者也都是通过表达式进行计算的。
pub struct Aggregate {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// Grouping expressions
pub group_expr: Vec<Expr>,
/// Aggregate expressions
pub aggr_expr: Vec<Expr>,
/// The schema description of the aggregate output
pub schema: DFSchemaRef,
}Sort Node
对记录按照排序的表达式进行计算,然后进行正序或者逆序排列。
pub struct Sort {
/// The sort expressions
pub expr: Vec<SortExpr>,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// Optional fetch limit
pub fetch: Option<usize>,
}这里使用的表达式是SortExpr,是对Expr的简单封装,每个排序还有额外的选项,例如正序还是逆序,null如何处理
pub struct Sort {
/// The expression to sort on
pub expr: Expr,
/// The direction of the sort
pub asc: bool,
/// Whether to put Nulls before all other data values
pub nulls_first: bool,
}Sort当中fetch字段的作用?
可能是用来提前结束排序的。limit的下推?
Join Node
负责连接两张表。
pub struct Join {
/// Left input
pub left: Arc<LogicalPlan>,
/// Right input
pub right: Arc<LogicalPlan>,
/// Equijoin clause expressed as pairs of (left, right) join expressions
pub on: Vec<(Expr, Expr)>,
/// Filters applied during join (non-equi conditions)
pub filter: Option<Expr>,
/// Join type
pub join_type: JoinType,
/// Join constraint
pub join_constraint: JoinConstraint,
/// The output schema, containing fields from the left and right inputs
pub schema: DFSchemaRef,
/// If null_equals_null is true, null == null else null != null
pub null_equals_null: bool,
}join的等值连接条件,即on字典的表达式元组数组。非等值的filter。
Repartition Node
将input的数据按照指定的模式进行重新分区
pub struct Repartition {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The partitioning scheme
pub partitioning_scheme: Partitioning,
}目前的分区模式,支持轮询分区、哈希分区以及表达式分区
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified number
/// of partitions.
Hash(Vec<Expr>, usize),
/// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
DistributeBy(Vec<Expr>),
}Union Node
联合多个input
pub struct Union {
/// Inputs to merge
pub inputs: Vec<Arc<LogicalPlan>>,
/// Union schema. Should be the same for all inputs.
pub schema: DFSchemaRef,
}TableScan Node
这个差不多可以算是最底层的node了,负责读数
pub struct TableScan {
/// The name of the table
pub table_name: TableReference,
/// The source of the table
pub source: Arc<dyn TableSource>,
/// Optional column indices to use as a projection
pub projection: Option<Vec<usize>>,
/// The schema description of the output
pub projected_schema: DFSchemaRef,
/// Optional expressions to be used as filters by the table provider
pub filters: Vec<Expr>,
/// Optional number of rows to read
pub fetch: Option<usize>,
}Subquery Node
用于实现子查询。其实子查询也就是一个查询描述。这里就是简单记录了一下,额外加上输出的schema
pub struct Subquery {
/// The subquery
pub subquery: Arc<LogicalPlan>,
/// The outer references used in the subquery
pub outer_ref_columns: Vec<Expr>,
}SubqueryAlias Node
和Subquery配合的,用于给子查询一个别名表
pub struct SubqueryAlias {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The alias for the input relation
pub alias: TableReference,
/// The schema with qualified field names
pub schema: DFSchemaRef,
}
Limit Node
只取前n个记录
pub struct Limit {
/// Number of rows to skip before fetch
pub skip: Option<Box<Expr>>,
/// Maximum number of rows to fetch,
/// None means fetching all rows
pub fetch: Option<Box<Expr>>,
/// The logical plan
pub input: Arc<LogicalPlan>,
}Statement Node
除了sql定义的计算相关信息外,Datafusion还支持一些语句,例如SET VARIABLE or START TRANSACTION,这就是Statement的作用了
pub enum Statement {
// Begin a transaction
TransactionStart(TransactionStart),
// Commit or rollback a transaction
TransactionEnd(TransactionEnd),
/// Set a Variable
SetVariable(SetVariable),
}Values Node
和sql的values子句类似。用于返回一些数据
pub struct Values {
/// The table schema
pub schema: DFSchemaRef,
/// Values
pub values: Vec<Vec<Expr>>,
}Explain Node
将LogicalNode结构用字符串打印出来
pub struct Explain {
/// Should extra (detailed, intermediate plans) be included?
pub verbose: bool,
/// The logical plan that is being EXPLAIN'd
pub plan: Arc<LogicalPlan>,
/// Represent the various stages plans have gone through
pub stringified_plans: Vec<StringifiedPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
/// Used by physical planner to check if should proceed with planning
pub logical_optimization_succeeded: bool,
}Analyze Node
这个会实际运行最终执行的物理计划,并使用执行的metrics将物理计划打印出来
pub struct Analyze {
/// Should extra detail be included?
pub verbose: bool,
/// The logical plan that is being EXPLAIN ANALYZE'd
pub input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
}Extension Node
用于扩充Datafusion,自定义实现
pub struct Extension {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode>,
}Distinct Node
用于去重
pub enum Distinct {
/// Plain `DISTINCT` referencing all selection expressions
All(Arc<LogicalPlan>),
/// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
On(DistinctOn),
}RecursiveQuery Node
用于CTE递归查询
pub struct RecursiveQuery {
/// Name of the query
pub name: String,
/// The static term (initial contents of the working table)
pub static_term: Arc<LogicalPlan>,
/// The recursive term (evaluated on the contents of the working table until
/// it returns an empty set)
pub recursive_term: Arc<LogicalPlan>,
/// Should the output of the recursive term be deduplicated (`UNION`) or
/// not (`UNION ALL`).
pub is_distinct: bool,
}从static_term开始,递归查询直到recursive_term为空。
在Datafusion中树形结构是通过TreeNode trait抽象的,无论是LogicalPlan,PhysicalExpr还是ExecutionPlan,只要是树形结构都需要实现这个trait。先了解这个trait对几个相关的描述都有帮助,详情可见TreeNode树形结构。
Expr
除了Node之外,在描述当中还有一个非常常用的概念Expr。
用自然语言的概念,表达式就是可以接受0或任意个输入参数,最终产生一个结果的操作。这也是和LogicalPlan的区别,一个是确定的值,一个是数据块。
可以这样认为,一个sql除了关键字以外,所有可以填充的地方都是表达式。从上面LoginPlan的枚举值的类型当中就可以看到很多需要求值的地方使用得都是表达式。
就在下面表达式的枚举值当中也同样可以看出自身就是一个自引用的类型。
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum Expr {
/// 别名
Alias(Alias),
/// 列名
Column(Column),
/// 数值变量
ScalarVariable(DataType, Vec<String>),
/// 常量
Literal(ScalarValue),
/// 二元运算符
BinaryExpr(BinaryExpr),
/// like操作
Like(Like),
/// 使用正则表达式的like操作
SimilarTo(Like),
/// Not运算
Not(Box<Expr>),
/// True if argument is not NULL, false otherwise. This expression itself is never NULL.
IsNotNull(Box<Expr>),
/// True if argument is NULL, false otherwise. This expression itself is never NULL.
IsNull(Box<Expr>),
/// True if argument is true, false otherwise. This expression itself is never NULL.
IsTrue(Box<Expr>),
/// True if argument is false, false otherwise. This expression itself is never NULL.
IsFalse(Box<Expr>),
/// True if argument is NULL, false otherwise. This expression itself is never NULL.
IsUnknown(Box<Expr>),
/// True if argument is FALSE or NULL, false otherwise. This expression itself is never NULL.
IsNotTrue(Box<Expr>),
/// True if argument is TRUE OR NULL, false otherwise. This expression itself is never NULL.
IsNotFalse(Box<Expr>),
/// True if argument is TRUE or FALSE, false otherwise. This expression itself is never NULL.
IsNotUnknown(Box<Expr>),
/// arithmetic negation of an expression, the operand must be of a signed numeric data type
Negative(Box<Expr>),
/// Whether an expression is between a given range.
Between(Between),
/// The CASE expression is similar to a series of nested if/else and there are two forms that
/// can be used. The first form consists of a series of boolean "when" expressions with
/// corresponding "then" expressions, and an optional "else" expression.
///
/// ```text
/// CASE WHEN condition THEN result
/// [WHEN ...]
/// [ELSE result]
/// END
/// ```
///
/// The second form uses a base expression and then a series of "when" clauses that match on a
/// literal value.
///
/// ```text
/// CASE expression
/// WHEN value THEN result
/// [WHEN ...]
/// [ELSE result]
/// END
/// ```
Case(Case),
/// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
/// This expression is guaranteed to have a fixed type.
Cast(Cast),
/// Casts the expression to a given type and will return a null value if the expression cannot be cast.
/// This expression is guaranteed to have a fixed type.
TryCast(TryCast),
/// Represents the call of a scalar function with a set of arguments.
ScalarFunction(ScalarFunction),
/// Calls an aggregate function with arguments, and optional
/// `ORDER BY`, `FILTER`, `DISTINCT` and `NULL TREATMENT`.
///
/// See also [`ExprFunctionExt`] to set these fields.
///
/// [`ExprFunctionExt`]: crate::expr_fn::ExprFunctionExt
AggregateFunction(AggregateFunction),
/// Represents the call of a window function with arguments.
WindowFunction(WindowFunction),
/// Returns whether the list contains the expr value.
InList(InList),
/// EXISTS subquery
Exists(Exists),
/// IN subquery
InSubquery(InSubquery),
/// Scalar subquery
ScalarSubquery(Subquery),
/// Represents a reference to all available fields in a specific schema,
/// with an optional (schema) qualifier.
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
Wildcard {
qualifier: Option<TableReference>,
options: WildcardOptions,
},
/// List of grouping set expressions. Only valid in the context of an aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
/// A place holder for parameters in a prepared statement
/// (e.g. `$foo` or `$1`)
Placeholder(Placeholder),
/// A place holder which hold a reference to a qualified field
/// in the outer query, used for correlated sub queries.
OuterReferenceColumn(DataType, Column),
/// Unnest expression
Unnest(Unnest),
}
```