在TreeNode树形结构介绍了树结构的处理,在LogicalPlan介绍了LogicalPlan的各个Node。 从架构当中我们了解到LogicalPlan会被AnalyzerRules和OptimizerRules进行重写。这里我们就看是如何处理的。
首先,要明确
- Analyzer是负责应用AnalyzerRules的,先于OptimizerRules,确保Plan对于后续的OptmizerRules是有效的。
- Optimizer则是负责应用OptmizerRules,将LogicalPlan转为等效的,但是执行效率更高的Plan。 analyzer和optimizer负责遍历LogicalPlan这个树并进行重写。
这两部可以看作是逻辑优化的两个阶段。
Analyzer
pub trait AnalyzerRule: Debug {
/// Rewrite `plan`
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
/// A human readable name for this analyzer rule
fn name(&self) -> &str;
}AnalyzerRule的trait很简单,就是接收一个LogicalPlan,重写为一个新的 LogicalPlan。
Analyzer就是一堆规则的集合,负责将这些规则应用到传入的LogicalPlan当中。这里到是还有另一个函数重写的规则集合,不过这里不关注。
#[derive(Clone, Debug)]
pub struct Analyzer {
/// Expr --> Function writes to apply prior to analysis passes
pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
/// All rules to apply
pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
}Analyzer的执行流程为
pub fn execute_and_check<F>(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
// 记录开始时间
let start_time = Instant::now();
// 所有权转移,旧的plan可以认为不存在了
let mut new_plan = plan;
// 表达式重写为函数的规则组装
let expr_to_function: Option<Arc<dyn AnalyzerRule + Send + Sync>> =
if self.function_rewrites.is_empty() {
None
} else {
Some(Arc::new(ApplyFunctionRewrites::new(
self.function_rewrites.clone(),
)))
};
// 和analyze rule拼接在一起,不过表达式重写的规则要先于其他规则
// 因为其依赖输入的参数类型
let rules = expr_to_function.iter().chain(self.rules.iter());
// TODO add common rule executor for Analyzer and Optimizer
// 迭代执行analyze rule
for rule in rules {
new_plan = rule.analyze(new_plan, config).map_err(|e| {
DataFusionError::Context(rule.name().to_string(), Box::new(e))
})?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
// 规则应用结束之后检测并对异常的plan抛出错误
check_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}目前默认的Analyzer rule有如下几个
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(InlineTableScan::new()),
// Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule].
Arc::new(ExpandWildcardRule::new()),
// [Expr::Wildcard] should be expanded before [TypeCoercion]
Arc::new(ResolveGroupingFunction::new()),
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
];
Self::with_rules(rules)
}- InlineTableScan:要是子查询并没有过滤条件,那么可以去掉中间的子查询,改为从子查询的来源表进行查询。可以说是删除了一层中间表。
- ExpandWildcardRule:通配符展开
- ResolveGroupingFunction:使用内部的group id代替分组聚合函数
- TypeCoercion:类型转换
- CountWildcardRule:将
count(*)重写为count(常量)
Optimizer
analyzer之后就是optimizer了。两者的逻辑基本上是一致的,就是将注册的rule应用到传入的LogicalPlan当中。
pub struct Optimizer {
/// All optimizer rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}输入和输出也仍然是LogicalPlan,相关的逻辑都在rule上了。
先来optimizer应用注册的rule的实现逻辑
impl Optimizer {
/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
pub fn optimize<F>(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let start_time = Instant::now();
let options = config.options();
let mut new_plan = plan;
// 记录plan的优化历史
let mut previous_plans = HashSet::with_capacity(16);
previous_plans.insert(LogicalPlanSignature::new(&new_plan));
// 遍历rule,可配置应用的rule最大数
let mut i = 0;
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
for rule in &self.rules {
// If skipping failed rules, copy plan before attempting to rewrite
// as rewriting is destructive
// 需要跳过失败的rule,那么久需要提交拷贝一下现在的plan,因为rewrite会破坏现有的结构
let prev_plan = options
.optimizer
.skip_failed_rules
.then(|| new_plan.clone());
let starting_schema = Arc::clone(new_plan.schema());
// 应用rule来重写
let result = match rule.apply_order() {
// optimizer handles recursion
Some(apply_order) => new_plan.rewrite_with_subqueries(
&mut Rewriter::new(apply_order, rule.as_ref(), config),
),
// rule handles recursion itself
None => optimize_plan_node(new_plan, rule.as_ref(), config),
}
// 校验重写前后的schema保持一致
.and_then(|tnr| {
assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
Ok(tnr)
});
// Handle results
// 处理重写的结果
match (result, prev_plan) {
// OptimizerRule was successful
(
Ok(Transformed {
data, transformed, ..
}),
_,
) => {
// 重写成功发送通知给观察者。打印一些日志信息
new_plan = data;
observer(&new_plan, rule.as_ref());
if transformed {
log_plan(rule.name(), &new_plan);
} else {
debug!(
"Plan unchanged by optimizer rule '{}' (pass {})",
rule.name(),
i
);
}
}
// OptimizerRule was unsuccessful, but skipped failed rules is on
// so use the previous plan
// 重写失败,根据配置决定是否跳过失败
(Err(e), Some(orig_plan)) => {
// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/datafusion
warn!(
"Skipping optimizer rule '{}' due to unexpected error: {}",
rule.name(),
e
);
new_plan = orig_plan;
}
// OptimizerRule was unsuccessful, but skipped failed rules is off, return error
(Err(e), None) => {
return Err(e.context(format!(
"Optimizer rule '{}' failed",
rule.name()
)));
}
}
}
log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
// HashSet::insert returns, whether the value was newly inserted.
// 将写入的结果加入到列表当中
let plan_is_fresh =
previous_plans.insert(LogicalPlanSignature::new(&new_plan));
if !plan_is_fresh {
// plan did not change, so no need to continue trying to optimize
debug!("optimizer pass {} did not make changes", i);
break;
}
i += 1;
}
log_plan("Final optimized plan", &new_plan);
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}那么核心的逻辑就是如何下面这段代码了
let result = match rule.apply_order() {
// optimizer handles recursion
Some(apply_order) => new_plan.rewrite_with_subqueries(
&mut Rewriter::new(apply_order, rule.as_ref(), config),
),
// rule handles recursion itself
None => optimize_plan_node(new_plan, rule.as_ref(), config),
}
// 校验重写前后的schema保持一致
.and_then(|tnr| {
assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
Ok(tnr)
});这两个模式匹配的分支只在于处理顺序的不同,最终都还是调用到下面的函数来处理每个rule
fn optimize_plan_node(
plan: LogicalPlan,
rule: &dyn OptimizerRule,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if rule.supports_rewrite() {
return rule.rewrite(plan, config);
}
#[allow(deprecated)]
rule.try_optimize(&plan, config).map(|maybe_plan| {
match maybe_plan {
Some(new_plan) => {
// if the node was rewritten by the optimizer, replace the node
Transformed::yes(new_plan)
}
None => Transformed::no(plan),
}
})
}可以实现自己的优化规则,当然Datafusion本身也提供了一些默认的优化规则。例如在Optmizer创建的时候默认添加的
pub fn new() -> Self {
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
// 将嵌套的union变为一个单独的union
Arc::new(EliminateNestedUnion::new()),
// 表达式简化,例如 b > 2 AND b > 2 优化为 b > 2
Arc::new(SimplifyExpressions::new()),
// 移除不必要的cast。例如c1类型为INT32, 那么cast(c1 as INT64) > INT64(10)优化为 c1 > 10
Arc::new(UnwrapCastInComparison::new()),
// 将distinct用聚合函数替换
Arc::new(ReplaceDistinctWithAggregate::new()),
// 连接条件为false则消除join;内连接条件为true则替换为cross join
Arc::new(EliminateJoin::new()),
// 将in/exist重写为semi/anti join
Arc::new(DecorrelatePredicateSubquery::new()),
// 将作为过滤条件的子查询变为join
Arc::new(ScalarSubqueryToJoin::new()),
// 提取出Conjunctive Join Predicates,也就是存在多个判断逻辑的join,中的等值join谓词和其他谓词
Arc::new(ExtractEquijoinPredicate::new()),
// 移除重复的表达式
Arc::new(EliminateDuplicatedExpr::new()),
// 移除不必要的过滤
Arc::new(EliminateFilter::new()),
// 移除不必要的cross join
Arc::new(EliminateCrossJoin::new()),
// 公用表达式消除。会重复使用多次的表达式可以提取出来进行重用,在整个计算过程中只用计算一次
Arc::new(CommonSubexprEliminate::new()),
// 移除不必要的limit
Arc::new(EliminateLimit::new()),
Arc::new(PropagateEmptyRelation::new()),
// Must be after PropagateEmptyRelation
// 消除只有一个数据来源的union
Arc::new(EliminateOneUnion::new()),
// 等值的查询条件中key可为null的话,插入一个isnotnull过滤,否则null值永远无法匹配到
Arc::new(FilterNullJoinKeys::default()),
// 尝试将outer join变为inner join
Arc::new(EliminateOuterJoin::new()),
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
// 下推limit
Arc::new(PushDownLimit::new()),
// 下推filter
Arc::new(PushDownFilter::new()),
Arc::new(SingleDistinctToGroupBy::new()),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
// 经过前面优化再重新执行一些前面执行过的优化规则
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateGroupByConstant::new()),
Arc::new(OptimizeProjections::new()),
];
Self::with_rules(rules)
}直接重写为新节点还是原地修改。不过这也并不重要。直接以一个示例来说明rule重写的流程。例如将cross join优化为inner join
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let plan_schema = Arc::clone(plan.schema());
let mut possible_join_keys = JoinKeySet::new();
let mut all_inputs: Vec<LogicalPlan> = vec![];
let mut all_filters: Vec<Expr> = vec![];
let parent_predicate = if let LogicalPlan::Filter(filter) = plan {
// if input isn't a join that can potentially be rewritten
// avoid unwrapping the input
// 当前节点是Filter,并且input是Join则能够重写。这里cross join和inner join都用JoinType::Inner表示的
let rewriteable = matches!(
filter.input.as_ref(),
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
})
);
// 不可重写直接跳过
if !rewriteable {
// recursively try to rewrite children
return rewrite_children(self, LogicalPlan::Filter(filter), config);
}
// join是否可以打平,也就是join的子节点如果也是join,那么必须都是inner/cross join
if !can_flatten_join_inputs(&filter.input) {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
// 打平filter
let Filter {
input, predicate, ..
} = filter;
flatten_join_inputs(
Arc::unwrap_or_clone(input),
&mut possible_join_keys,
&mut all_inputs,
&mut all_filters,
)?;
// 从where子句当中提取join的key
extract_possible_join_keys(&predicate, &mut possible_join_keys);
Some(predicate)
} else if matches!(
plan,
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
})
) {
// 处理join节点
if !can_flatten_join_inputs(&plan) {
return Ok(Transformed::no(plan));
}
flatten_join_inputs(
plan,
&mut possible_join_keys,
&mut all_inputs,
&mut all_filters,
)?;
None
} else {
// recursively try to rewrite children
return rewrite_children(self, plan, config);
};
// 处理为join连接
let mut all_join_keys = JoinKeySet::new();
let mut left = all_inputs.remove(0);
while !all_inputs.is_empty() {
left = find_inner_join(
left,
&mut all_inputs,
&possible_join_keys,
&mut all_join_keys,
)?;
}
left = rewrite_children(self, left, config)?.data;
if &plan_schema != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(left),
Arc::clone(&plan_schema),
));
}
if !all_filters.is_empty() {
// Add any filters on top - PushDownFilter can push filters down to applicable join
let first = all_filters.swap_remove(0);
let predicate = all_filters.into_iter().fold(first, and);
left = LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(left))?);
}
let Some(predicate) = parent_predicate else {
return Ok(Transformed::yes(left));
};
// 没有可以用于join的key,那么什么都不做
if all_join_keys.is_empty() {
Filter::try_new(predicate, Arc::new(left))
.map(|filter| Transformed::yes(LogicalPlan::Filter(filter)))
} else {
// 用作join 的过滤条件就移除出去
match remove_join_expressions(predicate, &all_join_keys) {
Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left))
.map(|filter| Transformed::yes(LogicalPlan::Filter(filter))),
_ => Ok(Transformed::yes(left)),
}
}
}将cross join转为inner join的依据就是存在可以用于join的连接条件。因此搜集filter条件,和join涉及的表,将cross join变为了一个带join条件的inner join。