TreeNode树形结构介绍了树结构的处理,在LogicalPlan介绍了LogicalPlan的各个Node。 从架构当中我们了解到LogicalPlan会被AnalyzerRules和OptimizerRules进行重写。这里我们就看是如何处理的。

首先,要明确

  • Analyzer是负责应用AnalyzerRules的,先于OptimizerRules,确保Plan对于后续的OptmizerRules是有效的。
  • Optimizer则是负责应用OptmizerRules,将LogicalPlan转为等效的,但是执行效率更高的Plan。 analyzer和optimizer负责遍历LogicalPlan这个树并进行重写。

这两部可以看作是逻辑优化的两个阶段。

Analyzer

pub trait AnalyzerRule: Debug {
    /// Rewrite `plan`
    fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
 
    /// A human readable name for this analyzer rule
    fn name(&self) -> &str;
}

AnalyzerRule的trait很简单,就是接收一个LogicalPlan,重写为一个新的 LogicalPlan。

Analyzer就是一堆规则的集合,负责将这些规则应用到传入的LogicalPlan当中。这里到是还有另一个函数重写的规则集合,不过这里不关注。

#[derive(Clone, Debug)]
pub struct Analyzer {
    /// Expr --> Function writes to apply prior to analysis passes
    pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
    /// All rules to apply
    pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}

Analyzer的执行流程为

pub fn execute_and_check<F>(
	&self,
	plan: LogicalPlan,
	config: &ConfigOptions,
	mut observer: F,
) -> Result<LogicalPlan>
where
	F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
	// 记录开始时间
	let start_time = Instant::now();
	// 所有权转移,旧的plan可以认为不存在了
	let mut new_plan = plan;
 
	// 表达式重写为函数的规则组装
	let expr_to_function: Option<Arc<dyn AnalyzerRule + Send + Sync>> =
		if self.function_rewrites.is_empty() {
			None
		} else {
			Some(Arc::new(ApplyFunctionRewrites::new(
				self.function_rewrites.clone(),
			)))
		};
	// 和analyze rule拼接在一起,不过表达式重写的规则要先于其他规则
	// 因为其依赖输入的参数类型
	let rules = expr_to_function.iter().chain(self.rules.iter());
 
	// TODO add common rule executor for Analyzer and Optimizer
	// 迭代执行analyze rule
	for rule in rules {
		new_plan = rule.analyze(new_plan, config).map_err(|e| {
			DataFusionError::Context(rule.name().to_string(), Box::new(e))
		})?;
		log_plan(rule.name(), &new_plan);
		observer(&new_plan, rule.as_ref());
	}
	// 规则应用结束之后检测并对异常的plan抛出错误
	check_plan(&new_plan).map_err(|e| {
		DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
	})?;
	log_plan("Final analyzed plan", &new_plan);
	debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
	Ok(new_plan)
}

目前默认的Analyzer rule有如下几个

pub fn new() -> Self {
	let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
		Arc::new(InlineTableScan::new()),
		// Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule].
		Arc::new(ExpandWildcardRule::new()),
		// [Expr::Wildcard] should be expanded before [TypeCoercion]
		Arc::new(ResolveGroupingFunction::new()),
		Arc::new(TypeCoercion::new()),
		Arc::new(CountWildcardRule::new()),
	];
	Self::with_rules(rules)
}
  • InlineTableScan:要是子查询并没有过滤条件,那么可以去掉中间的子查询,改为从子查询的来源表进行查询。可以说是删除了一层中间表。
  • ExpandWildcardRule:通配符展开
  • ResolveGroupingFunction:使用内部的group id代替分组聚合函数
  • TypeCoercion:类型转换
  • CountWildcardRule:将count(*)重写为count(常量)

Optimizer

analyzer之后就是optimizer了。两者的逻辑基本上是一致的,就是将注册的rule应用到传入的LogicalPlan当中。

pub struct Optimizer {
    /// All optimizer rules to apply
    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

输入和输出也仍然是LogicalPlan,相关的逻辑都在rule上了。

先来optimizer应用注册的rule的实现逻辑

impl Optimizer {
    /// Optimizes the logical plan by applying optimizer rules, and
    /// invoking observer function after each call
    pub fn optimize<F>(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig,
        mut observer: F,
    ) -> Result<LogicalPlan>
    where
        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
    {
        let start_time = Instant::now();
        let options = config.options();
        let mut new_plan = plan;
		// 记录plan的优化历史
        let mut previous_plans = HashSet::with_capacity(16);
        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
		// 遍历rule,可配置应用的rule最大数
        let mut i = 0;
        while i < options.optimizer.max_passes {
            log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
 
            for rule in &self.rules {
                // If skipping failed rules, copy plan before attempting to rewrite
                // as rewriting is destructive
                // 需要跳过失败的rule,那么久需要提交拷贝一下现在的plan,因为rewrite会破坏现有的结构
                let prev_plan = options
                    .optimizer
                    .skip_failed_rules
                    .then(|| new_plan.clone());
 
                let starting_schema = Arc::clone(new_plan.schema());
				// 应用rule来重写
                let result = match rule.apply_order() {
                    // optimizer handles recursion
                    Some(apply_order) => new_plan.rewrite_with_subqueries(
                        &mut Rewriter::new(apply_order, rule.as_ref(), config),
                    ),
                    // rule handles recursion itself
                    None => optimize_plan_node(new_plan, rule.as_ref(), config),
                }
                // 校验重写前后的schema保持一致
                .and_then(|tnr| {
                    assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
                    Ok(tnr)
                });
 
                // Handle results
                // 处理重写的结果
                match (result, prev_plan) {
                    // OptimizerRule was successful
                    (
                        Ok(Transformed {
                            data, transformed, ..
                        }),
                        _,
                    ) => {
	                    // 重写成功发送通知给观察者。打印一些日志信息
                        new_plan = data;
                        observer(&new_plan, rule.as_ref());
                        if transformed {
                            log_plan(rule.name(), &new_plan);
                        } else {
                            debug!(
                                "Plan unchanged by optimizer rule '{}' (pass {})",
                                rule.name(),
                                i
                            );
                        }
                    }
                    // OptimizerRule was unsuccessful, but skipped failed rules is on
                    // so use the previous plan
                    // 重写失败,根据配置决定是否跳过失败
                    (Err(e), Some(orig_plan)) => {
                        // Note to future readers: if you see this warning it signals a
                        // bug in the DataFusion optimizer. Please consider filing a ticket
                        // https://github.com/apache/datafusion
                        warn!(
                            "Skipping optimizer rule '{}' due to unexpected error: {}",
                            rule.name(),
                            e
                        );
                        new_plan = orig_plan;
                    }
                    // OptimizerRule was unsuccessful, but skipped failed rules is off, return error
                    (Err(e), None) => {
                        return Err(e.context(format!(
                            "Optimizer rule '{}' failed",
                            rule.name()
                        )));
                    }
                }
            }
            log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
 
            // HashSet::insert returns, whether the value was newly inserted.
            // 将写入的结果加入到列表当中
            let plan_is_fresh =
                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
            if !plan_is_fresh {
                // plan did not change, so no need to continue trying to optimize
                debug!("optimizer pass {} did not make changes", i);
                break;
            }
            i += 1;
        }
        log_plan("Final optimized plan", &new_plan);
        debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
        Ok(new_plan)
    }
}

那么核心的逻辑就是如何下面这段代码了

let result = match rule.apply_order() {
	// optimizer handles recursion
	Some(apply_order) => new_plan.rewrite_with_subqueries(
		&mut Rewriter::new(apply_order, rule.as_ref(), config),
	),
	// rule handles recursion itself
	None => optimize_plan_node(new_plan, rule.as_ref(), config),
}
// 校验重写前后的schema保持一致
.and_then(|tnr| {
	assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
	Ok(tnr)
});

这两个模式匹配的分支只在于处理顺序的不同,最终都还是调用到下面的函数来处理每个rule

fn optimize_plan_node(
    plan: LogicalPlan,
    rule: &dyn OptimizerRule,
    config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
    if rule.supports_rewrite() {
        return rule.rewrite(plan, config);
    }
 
    #[allow(deprecated)]
    rule.try_optimize(&plan, config).map(|maybe_plan| {
        match maybe_plan {
            Some(new_plan) => {
                // if the node was rewritten by the optimizer, replace the node
                Transformed::yes(new_plan)
            }
            None => Transformed::no(plan),
        }
    })
}

可以实现自己的优化规则,当然Datafusion本身也提供了一些默认的优化规则。例如在Optmizer创建的时候默认添加的

pub fn new() -> Self {
	let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
		// 将嵌套的union变为一个单独的union
		Arc::new(EliminateNestedUnion::new()),
		// 表达式简化,例如 b > 2 AND b > 2 优化为 b > 2
		Arc::new(SimplifyExpressions::new()),
		// 移除不必要的cast。例如c1类型为INT32, 那么cast(c1 as INT64) > INT64(10)优化为 c1 > 10
		Arc::new(UnwrapCastInComparison::new()),
		// 将distinct用聚合函数替换
		Arc::new(ReplaceDistinctWithAggregate::new()),
		// 连接条件为false则消除join;内连接条件为true则替换为cross join
		Arc::new(EliminateJoin::new()),
		// 将in/exist重写为semi/anti join
		Arc::new(DecorrelatePredicateSubquery::new()),
		// 将作为过滤条件的子查询变为join
		Arc::new(ScalarSubqueryToJoin::new()),
		// 提取出Conjunctive Join Predicates,也就是存在多个判断逻辑的join,中的等值join谓词和其他谓词
		Arc::new(ExtractEquijoinPredicate::new()),
		// 移除重复的表达式
		Arc::new(EliminateDuplicatedExpr::new()),
		// 移除不必要的过滤
		Arc::new(EliminateFilter::new()),
		// 移除不必要的cross join
		Arc::new(EliminateCrossJoin::new()),
		// 公用表达式消除。会重复使用多次的表达式可以提取出来进行重用,在整个计算过程中只用计算一次
		Arc::new(CommonSubexprEliminate::new()),
		// 移除不必要的limit
		Arc::new(EliminateLimit::new()),
		Arc::new(PropagateEmptyRelation::new()),
		// Must be after PropagateEmptyRelation
		// 消除只有一个数据来源的union
		Arc::new(EliminateOneUnion::new()),
		// 等值的查询条件中key可为null的话,插入一个isnotnull过滤,否则null值永远无法匹配到
		Arc::new(FilterNullJoinKeys::default()),
		// 尝试将outer join变为inner join
		Arc::new(EliminateOuterJoin::new()),
		// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
		// 下推limit
		Arc::new(PushDownLimit::new()),
		// 下推filter
		Arc::new(PushDownFilter::new()),
		Arc::new(SingleDistinctToGroupBy::new()),
		// The previous optimizations added expressions and projections,
		// that might benefit from the following rules
		// 经过前面优化再重新执行一些前面执行过的优化规则
		Arc::new(SimplifyExpressions::new()),
		Arc::new(UnwrapCastInComparison::new()),
		Arc::new(CommonSubexprEliminate::new()),
		Arc::new(EliminateGroupByConstant::new()),
		Arc::new(OptimizeProjections::new()),
	];
 
	Self::with_rules(rules)
}

直接重写为新节点还是原地修改。不过这也并不重要。直接以一个示例来说明rule重写的流程。例如将cross join优化为inner join

fn rewrite(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>> {
        let plan_schema = Arc::clone(plan.schema());
        let mut possible_join_keys = JoinKeySet::new();
        let mut all_inputs: Vec<LogicalPlan> = vec![];
        let mut all_filters: Vec<Expr> = vec![];
 
        let parent_predicate = if let LogicalPlan::Filter(filter) = plan {
            // if input isn't a join that can potentially be rewritten
            // avoid unwrapping the input
	        // 当前节点是Filter,并且input是Join则能够重写。这里cross join和inner join都用JoinType::Inner表示的
            let rewriteable = matches!(
                filter.input.as_ref(),
                LogicalPlan::Join(Join {
                    join_type: JoinType::Inner,
                    ..
                })
            );
			// 不可重写直接跳过
            if !rewriteable {
                // recursively try to rewrite children
                return rewrite_children(self, LogicalPlan::Filter(filter), config);
            }
			// join是否可以打平,也就是join的子节点如果也是join,那么必须都是inner/cross join
            if !can_flatten_join_inputs(&filter.input) {
                return Ok(Transformed::no(LogicalPlan::Filter(filter)));
            }
			// 打平filter
            let Filter {
                input, predicate, ..
            } = filter;
            flatten_join_inputs(
                Arc::unwrap_or_clone(input),
                &mut possible_join_keys,
                &mut all_inputs,
                &mut all_filters,
            )?;
			// 从where子句当中提取join的key
            extract_possible_join_keys(&predicate, &mut possible_join_keys);
            Some(predicate)
        } else if matches!(
            plan,
            LogicalPlan::Join(Join {
                join_type: JoinType::Inner,
                ..
            })
        ) {
	        // 处理join节点
            if !can_flatten_join_inputs(&plan) {
                return Ok(Transformed::no(plan));
            }
            flatten_join_inputs(
                plan,
                &mut possible_join_keys,
                &mut all_inputs,
                &mut all_filters,
            )?;
            None
        } else {
            // recursively try to rewrite children
            return rewrite_children(self, plan, config);
        };
 
        // 处理为join连接
        let mut all_join_keys = JoinKeySet::new();
        let mut left = all_inputs.remove(0);
        while !all_inputs.is_empty() {
            left = find_inner_join(
                left,
                &mut all_inputs,
                &possible_join_keys,
                &mut all_join_keys,
            )?;
        }
 
        left = rewrite_children(self, left, config)?.data;
 
        if &plan_schema != left.schema() {
            left = LogicalPlan::Projection(Projection::new_from_schema(
                Arc::new(left),
                Arc::clone(&plan_schema),
            ));
        }
 
        if !all_filters.is_empty() {
            // Add any filters on top - PushDownFilter can push filters down to applicable join
            let first = all_filters.swap_remove(0);
            let predicate = all_filters.into_iter().fold(first, and);
            left = LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(left))?);
        }
 
        let Some(predicate) = parent_predicate else {
            return Ok(Transformed::yes(left));
        };
 
        // 没有可以用于join的key,那么什么都不做
        if all_join_keys.is_empty() {
            Filter::try_new(predicate, Arc::new(left))
                .map(|filter| Transformed::yes(LogicalPlan::Filter(filter)))
        } else {
            // 用作join 的过滤条件就移除出去
            match remove_join_expressions(predicate, &all_join_keys) {
                Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left))
                    .map(|filter| Transformed::yes(LogicalPlan::Filter(filter))),
                _ => Ok(Transformed::yes(left)),
            }
        }
    }

将cross join转为inner join的依据就是存在可以用于join的连接条件。因此搜集filter条件,和join涉及的表,将cross join变为了一个带join条件的inner join。