Analyzer应用FunctionRewrite和AnalyzerRule到LogicalPlan,来为其执行做准备工作。例如类型转换来保证操作数的类型和函数的类型相匹配。
#[derive(Clone, Debug)]
pub struct Analyzer {
/// Expr --> Function writes to apply prior to analysis passes
pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
/// All rules to apply
pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}Analyzer这个结构体很简单,就是各种规则的汇总,具体的转换逻辑在这些规则的实现当中。Analyzer则是负责管理整个流程,只要将实现的规则在这里进行注册就能够进行扩展了。
两类重写trait的定义如下,非常简洁。
pub trait FunctionRewrite: Debug {
fn name(&self) -> &str;
fn rewrite(
&self,
expr: Expr,
schema: &DFSchema,
config: &ConfigOptions,
) -> Result<Transformed<Expr>>;
}
pub trait AnalyzerRule: Debug {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}下面先看Analyzer的应用流程,即execute_and_check方法的实现逻辑
pub fn execute_and_check<F>(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
...
}Analyzer流程
1. 前置检查
plan.check_invariants(InvariantLevel::Always)
.map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;在开始应用规则之前做的一次检查,要是输入的LogicalPlan就不符合要求直接在入口检测到错误。比如确保整个LogicalPlan的Schema不存在重复的字段。
2. AnalyzeRule链拼接
let expr_to_function: Option<Arc<dyn AnalyzerRule + Send + Sync>> =
if self.function_rewrites.is_empty() {
None
} else {
Some(Arc::new(ApplyFunctionRewrites::new(
self.function_rewrites.clone(),
)))
};
let rules = expr_to_function.iter().chain(self.rules.iter());将所有的function_rewrites规则包装为AnalyzerRule trait的实现ApplyFunctionRewrites。并将这个规则放在其他规则之前。
这是为了让function_rewrites优先将输入的表达式转为规范的函数调用形式,因为后续的规则中可能有类型转换改变参数的类型,如果function_rewrites放在后面执行的话就会导致错过原本应该进行的重写,以及应用了错误的重写,导致非预期的逻辑。
因此放在最前面来为不同的语法糖等形态转为统一的规范输入。
3. 应用规则
就是按序遍历规则并应用,生成新的LogicalPlan
for rule in rules {
new_plan = rule
.analyze(new_plan, config)
.map_err(|e| e.context(rule.name()))?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}4. 后置检查
在所有规则应用之后,对最终的LogicalPlan再做一次检查,这次是查询最后的结果是否满足可运行的要求。比如不再存在未解析或者占位的结构,语义满足约束。
new_plan
.check_invariants(InvariantLevel::Executable)
.map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?;AnalyzerRule示例
TypeCoercion
通过schema和表达式重写来做类型强制转换。
impl AnalyzerRule for TypeCoercion {
fn name(&self) -> &str {
"type_coercion"
}
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let empty_schema = DFSchema::empty();
// recurse
let transformed_plan = plan
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
.data;
// 按照配置将最终的转换结果的类型再规范化一下
coerce_output(transformed_plan, config)
}
}主要的转换在递归执行analyze_internal完成。
递归逻辑为:外层用一个空 schema 从上向下递归遍历;当遇到表达式中的子查询时,会在 TypeCoercionRewriter 内部再调用 analyze_internal(self.schema, subquery),把“当前节点的 schema”作为子查询的 external_schema 传入,从而支持相关子查询的外层列引用。
看下analyze_internal的实现
fn analyze_internal(
external_schema: &DFSchema,
plan: LogicalPlan,
) -> Result<Transformed<LogicalPlan>> {
// 将当前节点的所有input的schema合并
let mut schema = merge_schema(&plan.inputs());
// 当前节点是TableScan那么就没有input了,直接使用表本身的schema
if let LogicalPlan::TableScan(ts) = &plan {
let source_schema = DFSchema::try_from_qualified_schema(
ts.table_name.clone(),
&ts.source.schema(),
)?;
schema.merge(&source_schema);
}
// external_schema是当前关联的子查询的schema,也合并
schema.merge(external_schema);
// 将过滤器谓词强制转为布尔类型
let plan = if let LogicalPlan::Filter(mut filter) = plan {
filter.predicate = filter.predicate.cast_to(&DataType::Boolean, &schema)?;
LogicalPlan::Filter(filter)
} else {
plan
};
// 用合并后的 schema 构造 `TypeCoercionRewriter`
let mut expr_rewrite = TypeCoercionRewriter::new(&schema);
// 借助 `NamePreserver` 保存并恢复别名/输出列名,避免因重写导致命名抖动。
let name_preserver = NamePreserver::new(&plan);
// 在整棵节点上逐个表达式做类型强制
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr);
expr.rewrite(&mut expr_rewrite)
.map(|transformed| transformed.update_data(|e| original_name.restore(e)))
})?
// 某些算子需要在表达式强制之后做“节点级”的补充强制
// Join:按对(left_expr, right_expr)对齐比较类型,且 Join filter 强制为 Boolean;
// Union:把所有输入投射到一个兼容的统一 schema;
// Limit:把 LIMIT/OFFSET 强制为 Int64
.map_data(|plan| expr_rewrite.coerce_plan(plan))?
// 因为表达式的类型可能变化(比如插入 cast),需要在末尾对当前节点的 schema 做一次重算,以保持计划的类型一致性。
.map_data(|plan| plan.recompute_schema())
}类型转换的逻辑主要在TypeCoercionRewriter,就是简单的改写让类型符合预期,比如让IsNotTrue里面的expr类型要为布尔类型,将内部的expr重写为cast包装的expr
Expr::IsNotTrue(expr) => Ok(Transformed::yes(is_not_true(
get_casted_expr_for_bool_op(*expr, self.schema)?,
))),ResolveGroupingFunction
用来“消解”SQL 标准里的 GROUPING() 聚合函数:把出现在 Aggregate 节点中的 GROUPING(exprs…) 改写为基于内部分组位图(internal grouping id)计算出的普通表达式,并将真正的 Aggregate 节点里的聚合列表中去掉这些 GROUPING() 调用。改写后,Aggregate 只保留真实的聚合函数,GROUPING() 的结果通过上层一个 Projection 用位运算从内部分组位图计算得到。
整体入口
- 规则入口直接在计划树上自底向上 transform,遇到 Aggregate 且其 aggr_expr 中包含 GROUPING() 时触发改写
impl AnalyzerRule for ResolveGroupingFunction {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform_up(analyze_internal).data()
}
改写触发条件与流程
- 遍历子查询后,匹配包含 GROUPING() 的 Aggregate 节点,调用 replace_grouping_exprs 做实际改写:
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
// rewrite any subqueries in the plan first
let transformed_plan =
plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
let transformed_plan = transformed_plan.transform_data(|plan| match plan {
// 匹配到包含group的agg节点
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
schema,
..
}) if contains_grouping_function(&aggr_expr) => Ok(Transformed::yes(
replace_grouping_exprs(input, schema, group_expr, aggr_expr)?,
)),
_ => Ok(Transformed::no(plan)),
})?;
Ok(transformed_plan)
}
改写的逻辑在replace_grouping_exprs中,把aggr_expr当中出现的grouping(...)特殊聚合函数从聚合节点当中删除,改为聚合之后用一个外层Projection来计算,这样使得
Aggregate只负责普通的聚合grouping(...)的值通过内部的分组id位图计算出来,语义正确