最终实际执行的计算使用的描述是ExecutionPlan,由LogicalPlan经过物理优化得来。这里先介绍一下实际执行的算子是什么样的。

首先ExecutionPlan是一个trait,约束了所有算子的行为

pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
	// 算子的可读名
    fn name(&self) -> &str;
 
    /// 和name方法一样,但是可以直接调用不需要一个实例
    fn static_name() -> &'static str
    where
        Self: Sized,
    {
        let full_name = std::any::type_name::<Self>();
        let maybe_start_idx = full_name.rfind(':');
        match maybe_start_idx {
            Some(start_idx) => &full_name[start_idx + 1..],
            None => "UNKNOWN",
        }
    }
 
    /// Returns the execution plan as [`Any`] so that it can be
    /// downcast to a specific implementation.
    fn as_any(&self) -> &dyn Any;
 
    /// 获取schema
    fn schema(&self) -> SchemaRef {
        Arc::clone(self.properties().schema())
    }
 
	// 返回输出的属性。例如排序、分区信息等
    fn properties(&self) -> &PlanProperties;
	// 指定所有子节点的数据分布要求
    fn required_input_distribution(&self) -> Vec<Distribution> {
        vec![Distribution::UnspecifiedDistribution; self.children().len()]
    }
	// 指定所有子节点的排序要求
    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
        vec![None; self.children().len()]
    }
 
	// 这个方法的实现会进行重排序的话返回false
    fn maintains_input_order(&self) -> Vec<bool> {
        vec![false; self.children().len()]
    }
	// 是否能够从分区中获取
    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
        // By default try to maximize parallelism with more CPUs if
        // possible
        self.required_input_distribution()
            .into_iter()
            .map(|dist| !matches!(dist, Distribution::SinglePartition))
            .collect()
    }
 
    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
 
    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
 
	// 重新分区
    fn repartitioned(
        &self,
        _target_partitions: usize,
        _config: &ConfigOptions,
    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
        Ok(None)
    }
 
	// 执行算子
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream>;
 
    // 返回算子的执行指标
    fn metrics(&self) -> Option<MetricsSet> {
        None
    }
 
	// 统计信息
    fn statistics(&self) -> Result<Statistics> {
        Ok(Statistics::new_unknown(&self.schema()))
    }
 
 
    fn supports_limit_pushdown(&self) -> bool {
        false
    }
 
    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
        None
    }
 
    fn fetch(&self) -> Option<usize> {
        None
    }
 
    fn cardinality_effect(&self) -> CardinalityEffect {
        CardinalityEffect::Unknown
    }
}

方法还是比较多的,但是我们关心的是计算部分,也就是计算的结果和结果的schema。

再往下就是各种算子的实现了,需要了解哪个算子的实现逻辑可以针对去看,这里就不继续深入细节了。

下面介绍一下ExecutionPlan是如何得到的。类似于LogicalPlan,ExecutionPlan也分为两步

  1. 通过PhysicalPlanner将LogicalPlan转为ExecutionPlan
  2. 通过应用物理优化规则,将ExecutionPlan转为更加高效的ExecutionPlan

第二步的物理优化规则应用逻辑和逻辑优化规则的应用是基本一致的,这里就忽略。这里我们主要看LogicalPlan是如何转为ExecutionPlan的。

PhysicalPlanner

PhysicalPlanner是一个trait约束

#[async_trait]
pub trait PhysicalPlanner: Send + Sync {
    /// Create a physical plan from a logical plan
    async fn create_physical_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>>;
 
    /// Create a physical expression from a logical expression
    /// suitable for evaluation
    ///
    /// `expr`: the expression to convert
    ///
    /// `input_dfschema`: the logical plan schema for evaluating `expr`
    fn create_physical_expr(
        &self,
        expr: &Expr,
        input_dfschema: &DFSchema,
        session_state: &SessionState,
    ) -> Result<Arc<dyn PhysicalExpr>>;
}

规定了两个方法。一个是将LogicalPlan转为ExecutionPlan,另一个是将Expr转为PhysicalExpr。

默认的实现为DefaultPhysicalPlanner

#[async_trait]
impl PhysicalPlanner for DefaultPhysicalPlanner {
    async fn create_physical_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        match self.handle_explain(logical_plan, session_state).await? {
            Some(plan) => Ok(plan), // 处理explain
            None => { // 非explan要执行的走下面的逻辑
	            // 1. 先将LogicalPlan转换为ExecutionPlan
                let plan = self
                    .create_initial_plan(logical_plan, session_state)
                    .await?;
				// 2. 应用物理优化规则
                self.optimize_physical_plan(plan, session_state, |_, _| {})
            }
        }
    }
 
    fn create_physical_expr(
        &self,
        expr: &Expr,
        input_dfschema: &DFSchema,
        session_state: &SessionState,
    ) -> Result<Arc<dyn PhysicalExpr>> {
        create_physical_expr(expr, input_dfschema, session_state.execution_props())
    }
}

负责计算转换的函数如下

    async fn create_initial_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // DFS遍历树将LogicalPlan展平到数组当中
        // 将LogicalPlan的树结构展平,使用dfs_visit_stack来辅助遍历
        // 将展平结果存储在flat_tree,其中叶子节点存储在flag_tree_leaf_indices当中
        let mut flat_tree = vec![];
        let mut dfs_visit_stack = vec![(None, logical_plan)];
        let mut flat_tree_leaf_indices = vec![];
        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
            let current_index = flat_tree.len();
 
            dfs_visit_stack
                .extend(node.inputs().iter().map(|&n| (Some(current_index), n)));
            let state = match node.inputs().len() {
                0 => {
	                // 叶子节点加入到flat_tree_leaf_indices当中
                    flat_tree_leaf_indices.push(current_index);
                    NodeState::ZeroOrOneChild
                }
                1 => NodeState::ZeroOrOneChild,
                _ => {
                    let ready_children = Vec::with_capacity(node.inputs().len());
                    let ready_children = Mutex::new(ready_children);
                    NodeState::TwoOrMoreChildren(ready_children)
                }
            };
            let node = LogicalNode {
                node,
                parent_index,
                state,
            };
            flat_tree.push(node);
        }
        let flat_tree = Arc::new(flat_tree);
 
        let planning_concurrency = session_state
            .config_options()
            .execution
            .planning_concurrency;
        
        // 设置执行的最大并发数,不超过cpu核心数和物理计划的叶子数
        let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len());
 
        // 从叶子节点开始向上遍历,构建任务
        let tasks = flat_tree_leaf_indices
            .into_iter()
            .map(|index| self.task_helper(index, flat_tree.clone(), session_state));
        // 并发执行任务并收集结构
        let mut outputs = futures::stream::iter(tasks)
            .buffer_unordered(max_concurrency)
            .try_collect::<Vec<_>>()
            .await?
            .into_iter()
            .flatten()
            .collect::<Vec<_>>();
        // Ideally this never happens if we have a valid LogicalPlan tree
        if outputs.len() != 1 {
            return internal_err!(
                "Failed to convert LogicalPlan to ExecutionPlan: More than one root detected"
            );
        }
        let plan = outputs.pop().unwrap();
        Ok(plan)
    }

那么下面主要就是如何根据叶子节点构建出任务的了

async fn task_helper<'a>(
	&'a self,
	leaf_starter_index: usize,
	flat_tree: Arc<Vec<LogicalNode<'a>>>,
	session_state: &'a SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
	
	// 开始的叶子节点
	let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| {
		internal_datafusion_err!(
			"Invalid index whilst creating initial physical plan"
		)
	})?;
	// 将LogicalNode转为ExecutionPlan
	let mut plan = self
		.map_logical_node_to_physical(
			node.node,
			session_state,
			ChildrenContainer::None,
		)
		.await?;
	let mut current_index = leaf_starter_index;
 
	// 从叶子节点开始向上遍历,直到到达root节点
	while let Some(parent_index) = node.parent_index {
		// 找到父节点
		node = flat_tree.get(parent_index).ok_or_else(|| {
			internal_datafusion_err!(
				"Invalid index whilst creating initial physical plan"
			)
		})?;
		match &node.state {
			// 父节点只有当前一个子节点,那么父节点也可以直接转为物理执行节点
			NodeState::ZeroOrOneChild => {
				plan = self
					.map_logical_node_to_physical(
						node.node,
						session_state,
						ChildrenContainer::One(plan),
					)
					.await?;
			}
			// 有多个子节点,那么直接将当前节点转为ExecutionPlanChild之后加入到children当中
			// 直到所有的子节点都处理结束之后,再将这个节点转为ExecutionPlan
			NodeState::TwoOrMoreChildren(children) => {
				let mut children: Vec<ExecutionPlanChild> = {
					let mut guard = children.lock().await;
					// 加入到父节点的children当中
					guard.push(ExecutionPlanChild {
						index: current_index,
						plan,
					});
					// 只有最后一个子节点被处理,才会将这个节点继续向上处理,否则直接就返回了
					if guard.len() < node.node.inputs().len() {
						return Ok(None);
					}
					std::mem::take(guard.as_mut())
				};
 
				children.sort_unstable_by_key(|epc| std::cmp::Reverse(epc.index));
				let children = children.into_iter().map(|epc| epc.plan).collect();
				let children = ChildrenContainer::Multiple(children);
				plan = self
					.map_logical_node_to_physical(node.node, session_state, children)
					.await?;
			}
		}
		current_index = parent_index;
	}
	// 一棵能够正常执行的计划,只会有一个任务能够执行到这里
	Ok(Some(plan))
}

从上面的逻辑看来,一个有效的执行计划只会查看一个task。不过在从叶子节点开始的自底向上的遍历过程中,其实是做了类似stage的划分的。也就是每个叶子节点到第一个分叉的父节点之间的路径上的所有节点都压缩到一个ExecutionPlanChild当中。再简单地说这些节点可以在一个线程中独立地执行。