在DAGScheduler任务调度当中学习了spark的查询任务是如何分割并调度执行的。这里再学习一下starrocks又是如何将一个大查询变为多个并行的子查询并调度执行的。
虽然在不同的计算引擎当中有着不同的代码实现,但是归根到底核心的思想就是分治+并发。查询拆分地越是细致,那么其能够并发的上限越高,资源的管控也同样更加严密。
前篇一律的sql的解析、优化这里就直接跳过。这里的核心关注点在于最终的物理执行计划是如何分割的,被调度到be上,以及在be上又是如何执行的。
FE的物理计划切割与调度
物理计划的构建,以及分片调度发送给BE执行的流程是在FE上完成的。
PlanFragment构建
首先要知道的是,一个查询首先被解析为一个完成的物理执行计划树。我们这里关注的第一件事,就是如何将一个完成的物理执行计划分割为多个PlanFragment,每个PlanFragment都是一段可以完整运行的执行计划。
完成这一点需要对物理执行计划树进行遍历并转换,这一点的代码入口在com.starrocks.sql.plan.PlanFragmentBuilder.PhysicalPlanTranslator#visit。
// 用啦将OptExpression重写为PlanFragment,其中ExecPlan为上下文信息
private static class PhysicalPlanTranslator extends OptExpressionVisitor<PlanFragment, ExecPlan> {
// OptExpression就是物理执行计划,是对Operator的封装,因为涉及到对树的重写,所以使用OptExpression封装了一下
public PlanFragment visit(OptExpression optExpression, ExecPlan context) {
canUseLocalShuffleAgg &= optExpression.arity() <= 1;
PlanFragment fragment = optExpression.getOp().accept(this, optExpression, context);
Projection projection = (optExpression.getOp()).getProjection();
if (projection != null) {
fragment = buildProjectNode(optExpression, projection, fragment, context);
}
PlanNode planRoot = fragment.getPlanRoot();
if (!(optExpression.getOp() instanceof PhysicalProjectOperator) && planRoot instanceof ProjectNode) {
// This projectNode comes from another node's projection field
planRoot = planRoot.getChild(0);
}
context.recordPlanNodeId2OptExpression(planRoot.getId().asInt(), optExpression);
return fragment;
}
}这个类当中还有很多转换不同的算子的实现方法,虽然每个方法看着都是接受OptExpression,返回PlanFragment。但实际上并不是每个方法都创建了新的PlanFragment,很多只是往已有的PlanFragment添加算子。因此我们这里主要关注的是什么时候会创建新的PlanFragment,据此来了解PlanFragment的分割逻辑。
简单分析了一些创建PlanFragment的地方,发现有如下特点:
- 绝大部分都是在数据源处创建的PlanFragment,其他大部分算子都是往已有的PlanFragment当中追加而不是新建
- 部分算子,例如topN, CTE, shuffle等,这类涉及到全量数据扫描的算子,会创建新的PlanFragment,并设置父子关系。两者之间通过一个
ExchangeNode相连,子PlanFragment的数据会发往这个ExchangeNode,而父PlanFragment会从这个ExchangeNode取数。
总结下来和spark的stage切割的原理类似,都是以需要跨节点网络通信为界进行分割。这部分操作之后PlanFragment之间建立了依赖关系,并且每个PlanFragment都可以在一个计算节点上执行下来。
PlanFragment的调度
由于每个PlanFragment都是可以在一个节点上完整执行的计算片段,所以只要按照构建出来的PlanFragment依赖关系,从无依赖的PlanFragment开始调度执行,最终就能够执行完整个计算。
PlanFragment调度的代码入口在com.starrocks.qe.scheduler.dag.AllAtOnceExecutionSchedule#schedule,目前调度方式有两种,这里选择了其中一种进行说明。
public void schedule(Coordinator.ScheduleOption option) throws RpcException, StarRocksException {
List<DeployState> states = new ArrayList<>();
// 拓扑排序,保证按照依赖关系顺序执行
for (List<ExecutionFragment> executionFragments : dag.getFragmentsInTopologicalOrderFromRoot()) {
final DeployState deployState = deployer.createFragmentExecStates(executionFragments);
// 将PlanFragment调度到worker
deployer.deployFragments(deployState);
states.add(deployState);
}
ExecutorService executorService = null;
if (option.useQueryDeployExecutor) {
executorService = GlobalStateMgr.getCurrentState().getQueryDeployExecutor();
}
DeployMoreScanRangesTask task = new DeployMoreScanRangesTask(states, executorService);
task.start();
}到这里停止,先不需要继续细致了解代码了,PlanFragment如何划分出来以及如何调度的过程目前看下来都比较清晰了。
剩下的PlanFragment如何发送到worker的代码,可以从代码入口com.starrocks.qe.scheduler.dag.FragmentInstanceExecState#deployAsync开始查看。
FE的流程到此为止。
BE fragment执行流程
我们主要看BE是如何执行PlanFragment的。
FE通过com.starrocks.rpc.PBackendService#execPlanFragmentAsync将PlanFragment发送给BE,那么就从BE上这个函数的server端实现开始,了解其整体的执行流程。
根据方法上的注解,我们最终找到了src/service/internal_service.h的exec_plan_fragment函数实现。
有几个比较相关的方法
exec_plan_fragment
fetch_data
cancel_plan_fragment分别用来提及执行fragment,取数,控制fragment执行。
fragment任务提交
即exec_plan_fragment方法
template <typename T>
void PInternalServiceImplBase<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
// 构建任务
auto task = [=]() { this->_exec_plan_fragment(cntl_base, request, response, done); };
// 提交任务到线程池
if (!_exec_env->query_rpc_pool()->try_offer(std::move(task))) {
ClosureGuard closure_guard(done);
Status::ServiceUnavailable("submit exec_plan_fragment task failed").to_protobuf(response->mutable_status());
}
}这里的逻辑还是非常清晰的,就是基于请求传递的Fragment创建一个Task,然后提交到线程池中执行。
任务的构建和执行涉及到内容比较多,这里我们先了解任务是如何提交到线程池当中,并且是如何被调度执行的。
任务提交到的线程池是一个优先级线程池,其中有一个阻塞队列,接受外部提交的任务
class PriorityThreadPool {
BlockingPriorityQueue<Task> _work_queue;
bool try_offer(const Task& task) { return _work_queue.try_put(task);
}既然往这个队列中添加任务,那么自然有地方从队列中取出任务并执行。
class PriorityThreadPool {
PriorityThreadPool(std::string name, uint32_t num_threads, uint32_t queue_size)
: _name(std::move(name)), _work_queue(queue_size), _shutdown(false) {
// 启动指定个数的线程
for (int i = 0; i < num_threads; ++i) {
new_thread(++_current_thread_id);
}
}
void new_thread(int tid) {
// 创建线程对象,线程执行的函数为work_thread
auto* thr = _threads.create_thread(std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, tid));
Thread::set_thread_name(thr->native_handle(), _name);
_threads_holder.emplace_back(thr, tid);
}
// 线程池中每个线程的驱动方法,不断从任务队列中取出任务并执行。直到线程池终止
void work_thread(int thread_id) {
while (!is_shutdown()) {
Task task;
// 阻塞等待获得任务
if (_work_queue.blocking_get(&task)) {
// 执行任务
task.work_function();
}
// 任务队列为空,通知一些阻塞等待任务全部执行结束的操作,例如释放线程池
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
}
// 执行结束之后判断是否要结束掉这个线程。这样可以在运行过程中调节动态缩小线程数
if (_should_decrease) {
bool need_destroy = true;
int32_t expect;
int32_t target;
do {
expect = _should_decrease;
target = expect - 1;
if (expect == 0) {
need_destroy = false;
break;
}
} while (!_should_decrease.compare_exchange_weak(expect, target));
if (need_destroy) {
remove_thread(thread_id);
break;
}
}
}
}逻辑也是非常清晰的,PriorityThreadPool构造之后就会启动指定个数的线程,然后等待从任务队列中取出任务,并执行。
Fragment调度
fragment提交并调度执行的逻辑已经介绍过了,下面就是Fragment具体的执行流程了。
追踪_exec_plan_fragment到最后可以到达如下的函数。执行fragment有pipeline和非pipeline两种方式,这里我们只看pipeline方式
template <typename T>
Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline(const TExecPlanFragmentParams& t_common_param,
const TExecPlanFragmentParams& t_unique_request) {
// 1. 获取Fragment执行器
pipeline::FragmentExecutor fragment_executor;
// 2. 做好执行前的准备工作。初始化上下文,执行需要的资源等等
auto status = fragment_executor.prepare(_exec_env, t_common_param, t_unique_request);
if (status.ok()) {
// 3. 开始执行
return fragment_executor.execute(_exec_env);
} else {
return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
}
}整体上可以分为两个阶段
- 执行前的准备阶段
- 执行阶段
其中第一阶段就是准备好执行过程需要的资源,事项比较多,但在本节当中我们重点需要关注的只有pipeline的构建。即
FragmentExecutor::_prepare_pipeline_driver函数。
Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
// 创建用户构建pipeline的上下文
PipelineBuilderContext context(_fragment_ctx.get(), degree_of_parallelism, sink_dop, is_stream_pipeline);
context.init_colocate_groups(std::move(_colocate_exec_groups));
PipelineBuilder builder(context);
// 解析为pipeline
auto exec_ops = builder.decompose_exec_node_to_pipeline(*_fragment_ctx, plan);
_fragment_ctx->set_data_sink(std::move(datasink));
// 构建出exec_groups和pipelines,并设置到FragmentContext当中去
auto [exec_groups, pipelines] = builder.build();
_fragment_ctx->set_pipelines(std::move(exec_groups), std::move(pipelines));
}具体的逻辑暂不深入,从上面的代码来看,是将Fragment解析为了了一组ExecGroups和Pipelines。这是比Fragment更加细粒度的执行单元,毕竟Fragment只是单节点粒度执行的,而Pipelines则能够在单节点的基础上进一步并发执行。
而ExecGroups就是用来执行pipeline的,因为执行过程中还需要记录一些状态或者管理pipline的执行,因此使用这个对象来包装pipeline的执行流程。
下面来看执行阶段。
Status FragmentExecutor::execute(ExecEnv* exec_env) {
bool prepare_success = false;
DeferOp defer([this, &prepare_success]() {
if (!prepare_success) {
_fail_cleanup(true);
}
});
auto* profile = _fragment_ctx->runtime_state()->runtime_profile();
auto* prepare_instance_timer = ADD_TIMER(profile, "FragmentInstancePrepareTime");
auto* prepare_driver_timer =
ADD_CHILD_TIMER_THESHOLD(profile, "prepare-pipeline-driver", "FragmentInstancePrepareTime", 10_ms);
{
SCOPED_TIMER(prepare_instance_timer);
SCOPED_TIMER(prepare_driver_timer);
_fragment_ctx->acquire_runtime_filters();
RETURN_IF_ERROR(_fragment_ctx->prepare_active_drivers());
}
prepare_success = true;
// 获取DriverExecutor对象,然后将任务提交到这里进行执行
DCHECK(_fragment_ctx->enable_resource_group());
auto* executor = _wg->executors()->driver_executor();
RETURN_IF_ERROR(_fragment_ctx->submit_active_drivers(executor));
return Status::OK();
}去掉一些定时器的设置,这个函数的唯一的作用就是将任务提交到DriverExecutor执行。
Status FragmentContext::submit_active_drivers(DriverExecutor* executor) {
for (auto& group : _execution_groups) {
group->attach_driver_executor(executor);
group->submit_active_drivers();
}
return Status::OK();
}将驱动器附加到构建pipeline时一并创建的ExecGroups当中,其实这样就可以确定虽然Fragment被分解为更加细粒度了,但实际上一个Fragment仍然是通过一个DriverExecutor驱动执行的。
void NormalExecutionGroup::submit_active_drivers() {
VLOG_QUERY << "submit_active_drivers:" << to_string();
return for_each_active_driver(_pipelines, [this](const DriverPtr& driver) { _executor->submit(driver.get()); });
}归根到底还是将pipeline提交到executor并使用创建的线程池并发执行。
那么到这里一个查询是分析在FE被拆分为Fragment,以及发送到BE之后是如何再次拆分为pipeline并由DriverExecutor驱动并发执行的流程就比较清楚了。
TODO:还遗留了Pipeline的构建流程,以及DriverExecutor是如何驱动Pipeline执行的流程待补充。