DAGScheduler任务调度当中学习了spark的查询任务是如何分割并调度执行的。这里再学习一下starrocks又是如何将一个大查询变为多个并行的子查询并调度执行的。

虽然在不同的计算引擎当中有着不同的代码实现,但是归根到底核心的思想就是分治+并发。查询拆分地越是细致,那么其能够并发的上限越高,资源的管控也同样更加严密。

前篇一律的sql的解析、优化这里就直接跳过。这里的核心关注点在于最终的物理执行计划是如何分割的,被调度到be上,以及在be上又是如何执行的。

FE的物理计划切割与调度

物理计划的构建,以及分片调度发送给BE执行的流程是在FE上完成的。

PlanFragment构建

首先要知道的是,一个查询首先被解析为一个完成的物理执行计划树。我们这里关注的第一件事,就是如何将一个完成的物理执行计划分割为多个PlanFragment,每个PlanFragment都是一段可以完整运行的执行计划。

完成这一点需要对物理执行计划树进行遍历并转换,这一点的代码入口在com.starrocks.sql.plan.PlanFragmentBuilder.PhysicalPlanTranslator#visit

// 用啦将OptExpression重写为PlanFragment,其中ExecPlan为上下文信息
private static class PhysicalPlanTranslator extends OptExpressionVisitor<PlanFragment, ExecPlan> {
 
	// OptExpression就是物理执行计划,是对Operator的封装,因为涉及到对树的重写,所以使用OptExpression封装了一下
	public PlanFragment visit(OptExpression optExpression, ExecPlan context) {  
	    canUseLocalShuffleAgg &= optExpression.arity() <= 1;  
	    PlanFragment fragment = optExpression.getOp().accept(this, optExpression, context);  
	    Projection projection = (optExpression.getOp()).getProjection();  
	  
	    if (projection != null) {  
	        fragment = buildProjectNode(optExpression, projection, fragment, context);  
	    }  
	    PlanNode planRoot = fragment.getPlanRoot();  
	    if (!(optExpression.getOp() instanceof PhysicalProjectOperator) && planRoot instanceof ProjectNode) {  
	        // This projectNode comes from another node's projection field  
	        planRoot = planRoot.getChild(0);  
	    }  
	    context.recordPlanNodeId2OptExpression(planRoot.getId().asInt(), optExpression);  
	    return fragment;  
	}
}

这个类当中还有很多转换不同的算子的实现方法,虽然每个方法看着都是接受OptExpression,返回PlanFragment。但实际上并不是每个方法都创建了新的PlanFragment,很多只是往已有的PlanFragment添加算子。因此我们这里主要关注的是什么时候会创建新的PlanFragment,据此来了解PlanFragment的分割逻辑。

简单分析了一些创建PlanFragment的地方,发现有如下特点:

  1. 绝大部分都是在数据源处创建的PlanFragment,其他大部分算子都是往已有的PlanFragment当中追加而不是新建
  2. 部分算子,例如topN, CTE, shuffle等,这类涉及到全量数据扫描的算子,会创建新的PlanFragment,并设置父子关系。两者之间通过一个ExchangeNode相连,子PlanFragment的数据会发往这个ExchangeNode,而父PlanFragment会从这个ExchangeNode取数。

总结下来和spark的stage切割的原理类似,都是以需要跨节点网络通信为界进行分割。这部分操作之后PlanFragment之间建立了依赖关系,并且每个PlanFragment都可以在一个计算节点上执行下来。

PlanFragment的调度

由于每个PlanFragment都是可以在一个节点上完整执行的计算片段,所以只要按照构建出来的PlanFragment依赖关系,从无依赖的PlanFragment开始调度执行,最终就能够执行完整个计算。

PlanFragment调度的代码入口在com.starrocks.qe.scheduler.dag.AllAtOnceExecutionSchedule#schedule,目前调度方式有两种,这里选择了其中一种进行说明。

public void schedule(Coordinator.ScheduleOption option) throws RpcException, StarRocksException {  
    List<DeployState> states = new ArrayList<>();  
    // 拓扑排序,保证按照依赖关系顺序执行
    for (List<ExecutionFragment> executionFragments : dag.getFragmentsInTopologicalOrderFromRoot()) {  
        final DeployState deployState = deployer.createFragmentExecStates(executionFragments);  
        // 将PlanFragment调度到worker
        deployer.deployFragments(deployState);  
        states.add(deployState);  
    }  
  
    ExecutorService executorService = null;  
    if (option.useQueryDeployExecutor) {  
        executorService = GlobalStateMgr.getCurrentState().getQueryDeployExecutor();  
    }  
  
    DeployMoreScanRangesTask task = new DeployMoreScanRangesTask(states, executorService);  
    task.start();  
}

到这里停止,先不需要继续细致了解代码了,PlanFragment如何划分出来以及如何调度的过程目前看下来都比较清晰了。

剩下的PlanFragment如何发送到worker的代码,可以从代码入口com.starrocks.qe.scheduler.dag.FragmentInstanceExecState#deployAsync开始查看。

FE的流程到此为止。

BE fragment执行流程

我们主要看BE是如何执行PlanFragment的。

FE通过com.starrocks.rpc.PBackendService#execPlanFragmentAsync将PlanFragment发送给BE,那么就从BE上这个函数的server端实现开始,了解其整体的执行流程。

根据方法上的注解,我们最终找到了src/service/internal_service.hexec_plan_fragment函数实现。 有几个比较相关的方法

exec_plan_fragment
fetch_data
cancel_plan_fragment

分别用来提及执行fragment,取数,控制fragment执行。

fragment任务提交

exec_plan_fragment方法

template <typename T>
void PInternalServiceImplBase<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base,
                                                     const PExecPlanFragmentRequest* request,
                                                     PExecPlanFragmentResult* response,
                                                     google::protobuf::Closure* done) {
    // 构建任务
    auto task = [=]() { this->_exec_plan_fragment(cntl_base, request, response, done); };
    // 提交任务到线程池
    if (!_exec_env->query_rpc_pool()->try_offer(std::move(task))) {
        ClosureGuard closure_guard(done);
        Status::ServiceUnavailable("submit exec_plan_fragment task failed").to_protobuf(response->mutable_status());
    }
}

这里的逻辑还是非常清晰的,就是基于请求传递的Fragment创建一个Task,然后提交到线程池中执行。

任务的构建和执行涉及到内容比较多,这里我们先了解任务是如何提交到线程池当中,并且是如何被调度执行的。

任务提交到的线程池是一个优先级线程池,其中有一个阻塞队列,接受外部提交的任务

class PriorityThreadPool {
	BlockingPriorityQueue<Task> _work_queue;
	
	bool try_offer(const Task& task) { return _work_queue.try_put(task); 
}

既然往这个队列中添加任务,那么自然有地方从队列中取出任务并执行。

class PriorityThreadPool {
    PriorityThreadPool(std::string name, uint32_t num_threads, uint32_t queue_size)
            : _name(std::move(name)), _work_queue(queue_size), _shutdown(false) {
        // 启动指定个数的线程
        for (int i = 0; i < num_threads; ++i) {
            new_thread(++_current_thread_id);
        }
    }
    
    void new_thread(int tid) {
	    // 创建线程对象,线程执行的函数为work_thread
        auto* thr = _threads.create_thread(std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, tid));
        Thread::set_thread_name(thr->native_handle(), _name);
        _threads_holder.emplace_back(thr, tid);
    }
 
	// 线程池中每个线程的驱动方法,不断从任务队列中取出任务并执行。直到线程池终止
	void work_thread(int thread_id) {
        while (!is_shutdown()) {
            Task task;
            // 阻塞等待获得任务
            if (_work_queue.blocking_get(&task)) {
	            // 执行任务
                task.work_function();
            }
            // 任务队列为空,通知一些阻塞等待任务全部执行结束的操作,例如释放线程池
            if (_work_queue.get_size() == 0) {
                _empty_cv.notify_all();
            }
            // 执行结束之后判断是否要结束掉这个线程。这样可以在运行过程中调节动态缩小线程数
            if (_should_decrease) {
                bool need_destroy = true;
                int32_t expect;
                int32_t target;
                do {
                    expect = _should_decrease;
                    target = expect - 1;
                    if (expect == 0) {
                        need_destroy = false;
                        break;
                    }
                } while (!_should_decrease.compare_exchange_weak(expect, target));
                if (need_destroy) {
                    remove_thread(thread_id);
                    break;
                }
            }
        }
    
}

逻辑也是非常清晰的,PriorityThreadPool构造之后就会启动指定个数的线程,然后等待从任务队列中取出任务,并执行。

Fragment调度

fragment提交并调度执行的逻辑已经介绍过了,下面就是Fragment具体的执行流程了。 追踪_exec_plan_fragment到最后可以到达如下的函数。执行fragment有pipeline和非pipeline两种方式,这里我们只看pipeline方式

template <typename T>
Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline(const TExecPlanFragmentParams& t_common_param,
                                                                    const TExecPlanFragmentParams& t_unique_request) {
    // 1. 获取Fragment执行器
    pipeline::FragmentExecutor fragment_executor;
    // 2. 做好执行前的准备工作。初始化上下文,执行需要的资源等等
    auto status = fragment_executor.prepare(_exec_env, t_common_param, t_unique_request);
    if (status.ok()) {
	    // 3. 开始执行
        return fragment_executor.execute(_exec_env);
    } else {
        return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
    }
}

整体上可以分为两个阶段

  1. 执行前的准备阶段
  2. 执行阶段 其中第一阶段就是准备好执行过程需要的资源,事项比较多,但在本节当中我们重点需要关注的只有pipeline的构建。即FragmentExecutor::_prepare_pipeline_driver函数。
Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
	// 创建用户构建pipeline的上下文
	PipelineBuilderContext context(_fragment_ctx.get(), degree_of_parallelism, sink_dop, is_stream_pipeline);
    context.init_colocate_groups(std::move(_colocate_exec_groups));
    PipelineBuilder builder(context);
    // 解析为pipeline
	auto exec_ops = builder.decompose_exec_node_to_pipeline(*_fragment_ctx, plan);
	_fragment_ctx->set_data_sink(std::move(datasink));
	// 构建出exec_groups和pipelines,并设置到FragmentContext当中去
    auto [exec_groups, pipelines] = builder.build();
    _fragment_ctx->set_pipelines(std::move(exec_groups), std::move(pipelines));
 
}

具体的逻辑暂不深入,从上面的代码来看,是将Fragment解析为了了一组ExecGroups和Pipelines。这是比Fragment更加细粒度的执行单元,毕竟Fragment只是单节点粒度执行的,而Pipelines则能够在单节点的基础上进一步并发执行。

而ExecGroups就是用来执行pipeline的,因为执行过程中还需要记录一些状态或者管理pipline的执行,因此使用这个对象来包装pipeline的执行流程。

下面来看执行阶段。

Status FragmentExecutor::execute(ExecEnv* exec_env) {
    bool prepare_success = false;
    DeferOp defer([this, &prepare_success]() {
        if (!prepare_success) {
            _fail_cleanup(true);
        }
    });
 
    auto* profile = _fragment_ctx->runtime_state()->runtime_profile();
    auto* prepare_instance_timer = ADD_TIMER(profile, "FragmentInstancePrepareTime");
    auto* prepare_driver_timer =
            ADD_CHILD_TIMER_THESHOLD(profile, "prepare-pipeline-driver", "FragmentInstancePrepareTime", 10_ms);
 
    {
        SCOPED_TIMER(prepare_instance_timer);
        SCOPED_TIMER(prepare_driver_timer);
        _fragment_ctx->acquire_runtime_filters();
        RETURN_IF_ERROR(_fragment_ctx->prepare_active_drivers());
    }
    prepare_success = true;
 
	// 获取DriverExecutor对象,然后将任务提交到这里进行执行
    DCHECK(_fragment_ctx->enable_resource_group());
    auto* executor = _wg->executors()->driver_executor();
    RETURN_IF_ERROR(_fragment_ctx->submit_active_drivers(executor));
 
    return Status::OK();
}

去掉一些定时器的设置,这个函数的唯一的作用就是将任务提交到DriverExecutor执行。

Status FragmentContext::submit_active_drivers(DriverExecutor* executor) {
    for (auto& group : _execution_groups) {
        group->attach_driver_executor(executor);
        group->submit_active_drivers();
    }
    return Status::OK();
}

将驱动器附加到构建pipeline时一并创建的ExecGroups当中,其实这样就可以确定虽然Fragment被分解为更加细粒度了,但实际上一个Fragment仍然是通过一个DriverExecutor驱动执行的

void NormalExecutionGroup::submit_active_drivers() {
    VLOG_QUERY << "submit_active_drivers:" << to_string();
    return for_each_active_driver(_pipelines, [this](const DriverPtr& driver) { _executor->submit(driver.get()); });
}

归根到底还是将pipeline提交到executor并使用创建的线程池并发执行。

那么到这里一个查询是分析在FE被拆分为Fragment,以及发送到BE之后是如何再次拆分为pipeline并由DriverExecutor驱动并发执行的流程就比较清楚了。

TODO:还遗留了Pipeline的构建流程,以及DriverExecutor是如何驱动Pipeline执行的流程待补充。