Analyzer应用FunctionRewriteAnalyzerRuleLogicalPlan,来为其执行做准备工作。例如类型转换来保证操作数的类型和函数的类型相匹配。

#[derive(Clone, Debug)]
pub struct Analyzer {
    /// Expr --> Function writes to apply prior to analysis passes
    pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
    /// All rules to apply
    pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}

Analyzer这个结构体很简单,就是各种规则的汇总,具体的转换逻辑在这些规则的实现当中。Analyzer则是负责管理整个流程,只要将实现的规则在这里进行注册就能够进行扩展了。

两类重写trait的定义如下,非常简洁。

pub trait FunctionRewrite: Debug {
    fn name(&self) -> &str;
 
    fn rewrite(
        &self,
        expr: Expr,
        schema: &DFSchema,
        config: &ConfigOptions,
    ) -> Result<Transformed<Expr>>;
}
 
pub trait AnalyzerRule: Debug {
    fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
 
    fn name(&self) -> &str;
}

下面先看Analyzer的应用流程,即execute_and_check方法的实现逻辑

pub fn execute_and_check<F>(
	&self,
	plan: LogicalPlan,
	config: &ConfigOptions,
	mut observer: F,
) -> Result<LogicalPlan>
where
	F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
	...
}

Analyzer流程

1. 前置检查

plan.check_invariants(InvariantLevel::Always)
	.map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;

在开始应用规则之前做的一次检查,要是输入的LogicalPlan就不符合要求直接在入口检测到错误。比如确保整个LogicalPlanSchema不存在重复的字段。

2. AnalyzeRule链拼接

let expr_to_function: Option<Arc<dyn AnalyzerRule + Send + Sync>> =
	if self.function_rewrites.is_empty() {
		None
	} else {
		Some(Arc::new(ApplyFunctionRewrites::new(
			self.function_rewrites.clone(),
		)))
	};
let rules = expr_to_function.iter().chain(self.rules.iter());

将所有的function_rewrites规则包装为AnalyzerRule trait的实现ApplyFunctionRewrites。并将这个规则放在其他规则之前。 这是为了让function_rewrites优先将输入的表达式转为规范的函数调用形式,因为后续的规则中可能有类型转换改变参数的类型,如果function_rewrites放在后面执行的话就会导致错过原本应该进行的重写,以及应用了错误的重写,导致非预期的逻辑。

因此放在最前面来为不同的语法糖等形态转为统一的规范输入。

3. 应用规则

就是按序遍历规则并应用,生成新的LogicalPlan

for rule in rules {
	new_plan = rule
		.analyze(new_plan, config)
		.map_err(|e| e.context(rule.name()))?;
	log_plan(rule.name(), &new_plan);
	observer(&new_plan, rule.as_ref());
}

4. 后置检查

在所有规则应用之后,对最终的LogicalPlan再做一次检查,这次是查询最后的结果是否满足可运行的要求。比如不再存在未解析或者占位的结构,语义满足约束。

new_plan
	.check_invariants(InvariantLevel::Executable)
	.map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?;

AnalyzerRule示例

TypeCoercion

通过schema和表达式重写来做类型强制转换。

impl AnalyzerRule for TypeCoercion {
    fn name(&self) -> &str {
        "type_coercion"
    }
 
    fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
        let empty_schema = DFSchema::empty();
 
        // recurse
        let transformed_plan = plan
            .transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
            .data;
 
        // 按照配置将最终的转换结果的类型再规范化一下
        coerce_output(transformed_plan, config)
    }
}

主要的转换在递归执行analyze_internal完成。

递归逻辑为:外层用一个空 schema 从上向下递归遍历;当遇到表达式中的子查询时,会在 TypeCoercionRewriter 内部再调用 analyze_internal(self.schema, subquery),把“当前节点的 schema”作为子查询的 external_schema 传入,从而支持相关子查询的外层列引用。

看下analyze_internal的实现

fn analyze_internal(
    external_schema: &DFSchema,
    plan: LogicalPlan,
) -> Result<Transformed<LogicalPlan>> {
    // 将当前节点的所有input的schema合并
    let mut schema = merge_schema(&plan.inputs());
	// 当前节点是TableScan那么就没有input了,直接使用表本身的schema
    if let LogicalPlan::TableScan(ts) = &plan {
        let source_schema = DFSchema::try_from_qualified_schema(
            ts.table_name.clone(),
            &ts.source.schema(),
        )?;
        schema.merge(&source_schema);
    }
 
    // external_schema是当前关联的子查询的schema,也合并
    schema.merge(external_schema);
 
    // 将过滤器谓词强制转为布尔类型
    let plan = if let LogicalPlan::Filter(mut filter) = plan {
        filter.predicate = filter.predicate.cast_to(&DataType::Boolean, &schema)?;
        LogicalPlan::Filter(filter)
    } else {
        plan
    };
 
	// 用合并后的 schema 构造 `TypeCoercionRewriter`
    let mut expr_rewrite = TypeCoercionRewriter::new(&schema);
	// 借助 `NamePreserver` 保存并恢复别名/输出列名,避免因重写导致命名抖动。
    let name_preserver = NamePreserver::new(&plan);
    // 在整棵节点上逐个表达式做类型强制
    plan.map_expressions(|expr| {
        let original_name = name_preserver.save(&expr);
        expr.rewrite(&mut expr_rewrite)
            .map(|transformed| transformed.update_data(|e| original_name.restore(e)))
    })?
    // 某些算子需要在表达式强制之后做“节点级”的补充强制
	    // Join:按对(left_expr, right_expr)对齐比较类型,且 Join filter 强制为 Boolean;
		// Union:把所有输入投射到一个兼容的统一 schema;
		// Limit:把 LIMIT/OFFSET 强制为 Int64
    .map_data(|plan| expr_rewrite.coerce_plan(plan))?
    // 因为表达式的类型可能变化(比如插入 cast),需要在末尾对当前节点的 schema 做一次重算,以保持计划的类型一致性。
    .map_data(|plan| plan.recompute_schema())
}

类型转换的逻辑主要在TypeCoercionRewriter,就是简单的改写让类型符合预期,比如让IsNotTrue里面的expr类型要为布尔类型,将内部的expr重写为cast包装的expr

Expr::IsNotTrue(expr) => Ok(Transformed::yes(is_not_true(
	get_casted_expr_for_bool_op(*expr, self.schema)?,
))),

ResolveGroupingFunction

用来“消解”SQL 标准里的 GROUPING() 聚合函数:把出现在 Aggregate 节点中的 GROUPING(exprs…) 改写为基于内部分组位图(internal grouping id)计算出的普通表达式,并将真正的 Aggregate 节点里的聚合列表中去掉这些 GROUPING() 调用。改写后,Aggregate 只保留真实的聚合函数,GROUPING() 的结果通过上层一个 Projection 用位运算从内部分组位图计算得到。

整体入口

  • 规则入口直接在计划树上自底向上 transform,遇到 Aggregate 且其 aggr_expr 中包含 GROUPING() 时触发改写
impl AnalyzerRule for ResolveGroupingFunction {
    fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
        plan.transform_up(analyze_internal).data()
    }
 

改写触发条件与流程

  • 遍历子查询后,匹配包含 GROUPING() 的 Aggregate 节点,调用 replace_grouping_exprs 做实际改写:
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
    // rewrite any subqueries in the plan first
    let transformed_plan =
        plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
 
    let transformed_plan = transformed_plan.transform_data(|plan| match plan {
    // 匹配到包含group的agg节点
        LogicalPlan::Aggregate(Aggregate {
            input,
            group_expr,
            aggr_expr,
            schema,
            ..
        }) if contains_grouping_function(&aggr_expr) => Ok(Transformed::yes(
            replace_grouping_exprs(input, schema, group_expr, aggr_expr)?,
        )),
        _ => Ok(Transformed::no(plan)),
    })?;
 
    Ok(transformed_plan)
}
 

改写的逻辑在replace_grouping_exprs中,把aggr_expr当中出现的grouping(...)特殊聚合函数从聚合节点当中删除,改为聚合之后用一个外层Projection来计算,这样使得

  • Aggregate只负责普通的聚合
  • grouping(...)的值通过内部的分组id位图计算出来,语义正确