LogicalPlan是datafusion定义的计算描述的结构。这个是非常重要的核心组件。在此基础上,只要任何接口能够转化为LogicalPlan,那么就可以完成对Datafusion的集成。

Datafusion内置支持的sql和dataframe接口就是如此。初次之外proto也是一种描述,可以方便地将datafusion作为以一个独立服务启动。datafusion下的proto,sql都是如此。dataframe则是直接放在了核心模块当中了,datafusion实现也是直接转换为了对LogicalPlan的操作

pub struct DataFrame {
    // Box the (large) SessionState to reduce the size of DataFrame on the stack
    session_state: Box<SessionState>,
    plan: LogicalPlan,
}

所以LogicalPlan承担了承上启下的功能,以此为脉络能够更加清晰地梳理datafusion。

LogicalPlan是一个树形结构,在Rust中使用enum表达,在datafusion/expr/src/logical_plan/plan.rs文件中可以看到如下定义

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum LogicalPlan {
    /// Evaluates an arbitrary list of expressions (essentially a
    /// SELECT with an expression list) on its input.
    Projection(Projection),
    /// Filters rows from its input that do not match an
    /// expression (essentially a WHERE clause with a predicate
    /// expression).
    ///
    /// Semantically, `<predicate>` is evaluated for each row of the
    /// input; If the value of `<predicate>` is true, the input row is
    /// passed to the output. If the value of `<predicate>` is false
    /// (or null), the row is discarded.
    Filter(Filter),
    /// Windows input based on a set of window spec and window
    /// function (e.g. SUM or RANK).  This is used to implement SQL
    /// window functions, and the `OVER` clause.
    Window(Window),
    /// Aggregates its input based on a set of grouping and aggregate
    /// expressions (e.g. SUM). This is used to implement SQL aggregates
    /// and `GROUP BY`.
    Aggregate(Aggregate),
    /// Sorts its input according to a list of sort expressions. This
    /// is used to implement SQL `ORDER BY`
    Sort(Sort),
    /// Join two logical plans on one or more join columns.
    /// This is used to implement SQL `JOIN`
    Join(Join),
    /// Repartitions the input based on a partitioning scheme. This is
    /// used to add parallelism and is sometimes referred to as an
    /// "exchange" operator in other systems
    Repartition(Repartition),
    /// Union multiple inputs with the same schema into a single
    /// output stream. This is used to implement SQL `UNION [ALL]` and
    /// `INTERSECT [ALL]`.
    Union(Union),
    /// Produces rows from a [`TableSource`], used to implement SQL
    /// `FROM` tables or views.
    TableScan(TableScan),
    /// Produces no rows: An empty relation with an empty schema that
    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
    /// that has no values in the `FROM` clause.
    EmptyRelation(EmptyRelation),
    /// Produces the output of running another query.  This is used to
    /// implement SQL subqueries
    Subquery(Subquery),
    /// Aliased relation provides, or changes, the name of a relation.
    SubqueryAlias(SubqueryAlias),
    /// Skip some number of rows, and then fetch some number of rows.
    Limit(Limit),
    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
    Statement(Statement),
    /// Values expression. See
    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
    /// documentation for more details. This is used to implement SQL such as
    /// `VALUES (1, 2), (3, 4)`
    Values(Values),
    /// Produces a relation with string representations of
    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
    Explain(Explain),
    /// Runs the input, and prints annotated physical plan as a string
    /// with execution metric. This is used to implement SQL
    /// `EXPLAIN ANALYZE`.
    Analyze(Analyze),
    /// Extension operator defined outside of DataFusion. This is used
    /// to extend DataFusion with custom relational operations that
    Extension(Extension),
    /// Remove duplicate rows from the input. This is used to
    /// implement SQL `SELECT DISTINCT ...`.
    Distinct(Distinct),
    /// Prepare a statement and find any bind parameters
    /// (e.g. `?`). This is used to implement SQL-prepared statements.
    Prepare(Prepare),
    /// Data Manipulation Language (DML): Insert / Update / Delete
    Dml(DmlStatement),
    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
    Ddl(DdlStatement),
    /// `COPY TO` for writing plan results to files
    Copy(CopyTo),
    /// Describe the schema of the table. This is used to implement the
    /// SQL `DESCRIBE` command from MySQL.
    DescribeTable(DescribeTable),
    /// Unnest a column that contains a nested list type such as an
    /// ARRAY. This is used to implement SQL `UNNEST`
    Unnest(Unnest),
    /// A variadic query (e.g. "Recursive CTEs")
    RecursiveQuery(RecursiveQuery),
}

每一个枚举值都是一种节点类型。通过这些节点之间的组合嵌套基本上就可以满足描述任意的计算过程。当然,除了计算之外,还有一些特殊的节点,例如

  • DdlStatement
  • DmlStatement
  • Statement
  • CopyTo
  • Extension 这些用来定义结构、扩展、特殊操作的节点。

第一步就了解一个各个节点的类型。来看是如何利用这些类型来描述任意的复杂计算的。

LogicalPlanNode介绍

Projection Node

用于求取任意表达式列表的值。也就是SELCT语句的字段列表,这里的每个字段实际上都是一个表达式。

pub struct Projection {
    /// The list of expressions
    pub expr: Vec<Expr>,
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// The schema description of the output
    pub schema: DFSchemaRef,
}

在input之上,对expr表达式列表中的所有表达式求值。最终输出的模式与schema匹配。

Filter Node

也就是where子句。过滤掉输入中不匹配表达式的行

pub struct Filter {
    /// The predicate expression, which must have Boolean type.
    pub predicate: Expr,
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// The flag to indicate if the filter is a having clause
    pub having: bool,
}

Window Node

window是窗口函数。其实也是基于输入的数据用表达式求值。不过输入的数据都是窗口相关的数据。普通的函数是一行值进行计算,而窗口函数则是一组相关的值进行计算

pub struct Window {
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// The window function expression
    pub window_expr: Vec<Expr>,
    /// The schema description of the window output
    pub schema: DFSchemaRef,
}

Aggregate Node

聚合节点则是包括分组的键以及聚合表达式两部分。对同一分组中的行做聚合计算。两者也都是通过表达式进行计算的。

pub struct Aggregate {
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// Grouping expressions
    pub group_expr: Vec<Expr>,
    /// Aggregate expressions
    pub aggr_expr: Vec<Expr>,
    /// The schema description of the aggregate output
    pub schema: DFSchemaRef,
}

Sort Node

对记录按照排序的表达式进行计算,然后进行正序或者逆序排列。

pub struct Sort {
    /// The sort expressions
    pub expr: Vec<SortExpr>,
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// Optional fetch limit
    pub fetch: Option<usize>,
}

这里使用的表达式是SortExpr,是对Expr的简单封装,每个排序还有额外的选项,例如正序还是逆序,null如何处理

pub struct Sort {
    /// The expression to sort on
    pub expr: Expr,
    /// The direction of the sort
    pub asc: bool,
    /// Whether to put Nulls before all other data values
    pub nulls_first: bool,
}

Sort当中fetch字段的作用?

可能是用来提前结束排序的。limit的下推?

Join Node

负责连接两张表。

pub struct Join {
    /// Left input
    pub left: Arc<LogicalPlan>,
    /// Right input
    pub right: Arc<LogicalPlan>,
    /// Equijoin clause expressed as pairs of (left, right) join expressions
    pub on: Vec<(Expr, Expr)>,
    /// Filters applied during join (non-equi conditions)
    pub filter: Option<Expr>,
    /// Join type
    pub join_type: JoinType,
    /// Join constraint
    pub join_constraint: JoinConstraint,
    /// The output schema, containing fields from the left and right inputs
    pub schema: DFSchemaRef,
    /// If null_equals_null is true, null == null else null != null
    pub null_equals_null: bool,
}

join的等值连接条件,即on字典的表达式元组数组。非等值的filter。

Repartition Node

将input的数据按照指定的模式进行重新分区

pub struct Repartition {
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// The partitioning scheme
    pub partitioning_scheme: Partitioning,
}

目前的分区模式,支持轮询分区、哈希分区以及表达式分区

pub enum Partitioning {
    /// Allocate batches using a round-robin algorithm and the specified number of partitions
    RoundRobinBatch(usize),
    /// Allocate rows based on a hash of one of more expressions and the specified number
    /// of partitions.
    Hash(Vec<Expr>, usize),
    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
    DistributeBy(Vec<Expr>),
}

Union Node

联合多个input

pub struct Union {
    /// Inputs to merge
    pub inputs: Vec<Arc<LogicalPlan>>,
    /// Union schema. Should be the same for all inputs.
    pub schema: DFSchemaRef,
}

TableScan Node

这个差不多可以算是最底层的node了,负责读数

pub struct TableScan {
    /// The name of the table
    pub table_name: TableReference,
    /// The source of the table
    pub source: Arc<dyn TableSource>,
    /// Optional column indices to use as a projection
    pub projection: Option<Vec<usize>>,
    /// The schema description of the output
    pub projected_schema: DFSchemaRef,
    /// Optional expressions to be used as filters by the table provider
    pub filters: Vec<Expr>,
    /// Optional number of rows to read
    pub fetch: Option<usize>,
}

Subquery Node

用于实现子查询。其实子查询也就是一个查询描述。这里就是简单记录了一下,额外加上输出的schema

pub struct Subquery {
    /// The subquery
    pub subquery: Arc<LogicalPlan>,
    /// The outer references used in the subquery
    pub outer_ref_columns: Vec<Expr>,
}

SubqueryAlias Node

和Subquery配合的,用于给子查询一个别名表

pub struct SubqueryAlias {
    /// The incoming logical plan
    pub input: Arc<LogicalPlan>,
    /// The alias for the input relation
    pub alias: TableReference,
    /// The schema with qualified field names
    pub schema: DFSchemaRef,
}
 

Limit Node

只取前n个记录

pub struct Limit {
    /// Number of rows to skip before fetch
    pub skip: Option<Box<Expr>>,
    /// Maximum number of rows to fetch,
    /// None means fetching all rows
    pub fetch: Option<Box<Expr>>,
    /// The logical plan
    pub input: Arc<LogicalPlan>,
}

Statement Node

除了sql定义的计算相关信息外,Datafusion还支持一些语句,例如SET VARIABLE or START TRANSACTION,这就是Statement的作用了

pub enum Statement {
    // Begin a transaction
    TransactionStart(TransactionStart),
    // Commit or rollback a transaction
    TransactionEnd(TransactionEnd),
    /// Set a Variable
    SetVariable(SetVariable),
}

Values Node

和sql的values子句类似。用于返回一些数据

pub struct Values {
    /// The table schema
    pub schema: DFSchemaRef,
    /// Values
    pub values: Vec<Vec<Expr>>,
}

Explain Node

将LogicalNode结构用字符串打印出来

pub struct Explain {
    /// Should extra (detailed, intermediate plans) be included?
    pub verbose: bool,
    /// The logical plan that is being EXPLAIN'd
    pub plan: Arc<LogicalPlan>,
    /// Represent the various stages plans have gone through
    pub stringified_plans: Vec<StringifiedPlan>,
    /// The output schema of the explain (2 columns of text)
    pub schema: DFSchemaRef,
    /// Used by physical planner to check if should proceed with planning
    pub logical_optimization_succeeded: bool,
}

Analyze Node

这个会实际运行最终执行的物理计划,并使用执行的metrics将物理计划打印出来

pub struct Analyze {
    /// Should extra detail be included?
    pub verbose: bool,
    /// The logical plan that is being EXPLAIN ANALYZE'd
    pub input: Arc<LogicalPlan>,
    /// The output schema of the explain (2 columns of text)
    pub schema: DFSchemaRef,
}

Extension Node

用于扩充Datafusion,自定义实现

pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode>,
}

Distinct Node

用于去重

pub enum Distinct {
    /// Plain `DISTINCT` referencing all selection expressions
    All(Arc<LogicalPlan>),
    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
    On(DistinctOn),
}

RecursiveQuery Node

用于CTE递归查询

pub struct RecursiveQuery {
    /// Name of the query
    pub name: String,
    /// The static term (initial contents of the working table)
    pub static_term: Arc<LogicalPlan>,
    /// The recursive term (evaluated on the contents of the working table until
    /// it returns an empty set)
    pub recursive_term: Arc<LogicalPlan>,
    /// Should the output of the recursive term be deduplicated (`UNION`) or
    /// not (`UNION ALL`).
    pub is_distinct: bool,
}

从static_term开始,递归查询直到recursive_term为空。

在Datafusion中树形结构是通过TreeNode trait抽象的,无论是LogicalPlan,PhysicalExpr还是ExecutionPlan,只要是树形结构都需要实现这个trait。先了解这个trait对几个相关的描述都有帮助,详情可见TreeNode树形结构

Expr

除了Node之外,在描述当中还有一个非常常用的概念Expr。

用自然语言的概念,表达式就是可以接受0或任意个输入参数,最终产生一个结果的操作。这也是和LogicalPlan的区别,一个是确定的值,一个是数据块。

可以这样认为,一个sql除了关键字以外,所有可以填充的地方都是表达式。从上面LoginPlan的枚举值的类型当中就可以看到很多需要求值的地方使用得都是表达式。

就在下面表达式的枚举值当中也同样可以看出自身就是一个自引用的类型。

#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum Expr {
    /// 别名
    Alias(Alias),
    /// 列名
    Column(Column),
    /// 数值变量
    ScalarVariable(DataType, Vec<String>),
    /// 常量
    Literal(ScalarValue),
    /// 二元运算符
    BinaryExpr(BinaryExpr),
    /// like操作
    Like(Like),
    /// 使用正则表达式的like操作
    SimilarTo(Like),
    /// Not运算
    Not(Box<Expr>),
    /// True if argument is not NULL, false otherwise. This expression itself is never NULL.
    IsNotNull(Box<Expr>),
    /// True if argument is NULL, false otherwise. This expression itself is never NULL.
    IsNull(Box<Expr>),
    /// True if argument is true, false otherwise. This expression itself is never NULL.
    IsTrue(Box<Expr>),
    /// True if argument is  false, false otherwise. This expression itself is never NULL.
    IsFalse(Box<Expr>),
    /// True if argument is NULL, false otherwise. This expression itself is never NULL.
    IsUnknown(Box<Expr>),
    /// True if argument is FALSE or NULL, false otherwise. This expression itself is never NULL.
    IsNotTrue(Box<Expr>),
    /// True if argument is TRUE OR NULL, false otherwise. This expression itself is never NULL.
    IsNotFalse(Box<Expr>),
    /// True if argument is TRUE or FALSE, false otherwise. This expression itself is never NULL.
    IsNotUnknown(Box<Expr>),
    /// arithmetic negation of an expression, the operand must be of a signed numeric data type
    Negative(Box<Expr>),
    /// Whether an expression is between a given range.
    Between(Between),
    /// The CASE expression is similar to a series of nested if/else and there are two forms that
    /// can be used. The first form consists of a series of boolean "when" expressions with
    /// corresponding "then" expressions, and an optional "else" expression.
    ///
    /// ```text
    /// CASE WHEN condition THEN result
    ///      [WHEN ...]
    ///      [ELSE result]
    /// END
    /// ```
    ///
    /// The second form uses a base expression and then a series of "when" clauses that match on a
    /// literal value.
    ///
    /// ```text
    /// CASE expression
    ///     WHEN value THEN result
    ///     [WHEN ...]
    ///     [ELSE result]
    /// END
    /// ```
    Case(Case),
    /// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
    /// This expression is guaranteed to have a fixed type.
    Cast(Cast),
    /// Casts the expression to a given type and will return a null value if the expression cannot be cast.
    /// This expression is guaranteed to have a fixed type.
    TryCast(TryCast),
    /// Represents the call of a scalar function with a set of arguments.
    ScalarFunction(ScalarFunction),
    /// Calls an aggregate function with arguments, and optional
    /// `ORDER BY`, `FILTER`, `DISTINCT` and `NULL TREATMENT`.
    ///
    /// See also [`ExprFunctionExt`] to set these fields.
    ///
    /// [`ExprFunctionExt`]: crate::expr_fn::ExprFunctionExt
    AggregateFunction(AggregateFunction),
    /// Represents the call of a window function with arguments.
    WindowFunction(WindowFunction),
    /// Returns whether the list contains the expr value.
    InList(InList),
    /// EXISTS subquery
    Exists(Exists),
    /// IN subquery
    InSubquery(InSubquery),
    /// Scalar subquery
    ScalarSubquery(Subquery),
    /// Represents a reference to all available fields in a specific schema,
    /// with an optional (schema) qualifier.
    ///
    /// This expr has to be resolved to a list of columns before translating logical
    /// plan into physical plan.
    Wildcard {
        qualifier: Option<TableReference>,
        options: WildcardOptions,
    },
    /// List of grouping set expressions. Only valid in the context of an aggregate
    /// GROUP BY expression list
    GroupingSet(GroupingSet),
    /// A place holder for parameters in a prepared statement
    /// (e.g. `$foo` or `$1`)
    Placeholder(Placeholder),
    /// A place holder which hold a reference to a qualified field
    /// in the outer query, used for correlated sub queries.
    OuterReferenceColumn(DataType, Column),
    /// Unnest expression
    Unnest(Unnest),
}
```