概述
DuckDB 的并行执行系统负责使用多线程执行查询以提高性能。该系统建立在基于管道的执行模型之上,采用事件驱动的任务管理。
系统整体架构如下
查询请求
│
▼
┌─────────────────┐
│ 执行器 │
│ executor.cpp │
└─────────┬───────┘
│
▼
┌─────────────────┐
│ 元管道 │
│meta_pipeline.cpp│
└─────┬───┬───┬───┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│P1 │ │P2 │ │PN │ 管道 (pipeline.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│IE1│ │IE2│ │IEN│ 初始化事件 (pipeline_initialize_event.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
└─────┼─────┘
│
▼
┌─────────────────┐
│ 任务调度器 │
│task_scheduler.cpp│
└─────────┬───────┘
│
▼
┌─────────────────┐
│ 线程池 │
└─┬─────┬─────┬───┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│T1 │ │T2 │ │TM │ 工作线程 (thread_context.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│TE1│ │TE2│ │TEM│ 任务执行器 (task_executor.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│PE1│ │PE2│ │PEM│ 管道执行器 (pipeline_executor.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│ET1│ │ET2│ │ETM│ 执行器任务 (executor_task.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
▼ ▼ ▼
┌───┐ ┌───┐ ┌───┐
│PV1│ │PV2│ │PVM│ 管道事件 (pipeline_event.cpp)
└─┬─┘ └─┬─┘ └─┬─┘
│ │ │
└─────┼─────┘
│
▼
┌─────────────────┐
│ 任务通知器 │
│task_notifier.cpp│
└─────────┬───────┘
│
▼
┌─────────────────┐
│ 管道完成事件 │
│pipeline_complete│
│ _event.cpp │
└─────────┬───────┘
│
▼
┌─────────────────┐
│管道准备结束事件 │
│pipeline_prepare │
│ _finish_event.cpp│
└─────────┬───────┘
│
▼
┌─────────────────┐
│ 管道结束事件 │
│pipeline_finish │
│ _event.cpp │
└─────────┬───────┘
│
▼
返回结果
支撑系统:
中断处理 ········> 任务调度器、任务执行器、管道执行器
(interrupt.cpp)
基础管道事件 ····> 所有事件类型
(base_pipeline_event.cpp)
通用事件系统 ····> 基础管道事件
(event.cpp)从上述流程当中,我们主要需要关注的组件就是
- executor
- meta pipeline
- task scheduler
- task executor 以及贯穿这些组件的几个重要概念
- pipeline
- event
核心概念
Pipeline
pipeline是一组可以独立执行的算子序列。从其class的定义可以看出,pipeline可以从一个数据源中获取输出,中间经过多个算子的计算,最终将数据汇聚
class Pipeline : public enable_shared_from_this<Pipeline> {
public:
const vector<reference<PhysicalOperator>> &GetIntermediateOperators() const;
optional_ptr<PhysicalOperator> GetSink() {
return sink;
}
optional_ptr<PhysicalOperator> GetSource() {
return source;
}
};pipeline之间不存在依赖关系的可以并发执行 。
时间轴: ──────────────────────────────────────────>
Pipeline1: [Scan Table A] ──> [Filter] ──> [Build Hash]
Pipeline2: [Scan Table B] ──> [Probe Hash] ──> [Output]
重叠执行:
T1 T2 T3 T4 T5
P1: [S] ──[F]──[B]
P2: [S]──[P]──[O]
在一个pipeline内部还可以按照数据分片也进行并发执行
同一个 Pipeline 的多个实例并行处理不同的数据分片:
Table Scan
│
┌───────┼───────┐
│ │ │
▼ ▼ ▼
Pipeline1 Pipeline2 Pipeline3
(Chunk1-100) (Chunk101-200) (Chunk201-300)
│ │ │
└───────┼───────┘
│
▼
Combine Results已pipeline作为切入点,承上启下来看任务是如何构建与并发执行的。
MetaPipeline
这也是一个比较重要的概念。pipeline之间可以根据依赖的关系来决定执行顺序。但是对于有些算子,比如说Join算子,会分割为两个pipeline,HashBuild和HashProbe。这两个pipeline之间存在先后顺序,但是不具备依赖关系,可以确定的是具有相同的HashJoin这个sink。
MetaPipeline正是用来协调和优化这种具有相同的sink的pipeline的执行的。
- 执行顺序保证: 确保依赖的pipeline按正确顺序执行
- 资源协调: 统一管理共享sink的内存和状态
- 性能优化: 避免不必要的依赖,支持并行执行
- 复杂性管理: 将复杂的pipeline依赖关系封装为清晰的层次结构
Pipeline构建
Executor接受一个物理执行计划,拆分为pipeline并完成执行。 构建逻辑就再其初始化方法中
void Executor::Initialize(PhysicalOperator &plan) {
Reset();
InitializeInternal(plan);
}
void Executor::InitializeInternal(PhysicalOperator &plan) {
auto &scheduler = TaskScheduler::GetScheduler(context);
{
lock_guard<mutex> elock(executor_lock);
physical_plan = &plan;
this->profiler = ClientData::Get(context).profiler;
profiler->Initialize(plan);
this->producer = scheduler.CreateProducer();
// 构建root meta pipeline,为nullptr为sink
PipelineBuildState state;
auto root_pipeline = make_shared_ptr<MetaPipeline>(*this, state, nullptr);
// 这里是pipeline构建的核心方法
root_pipeline->Build(*physical_plan);
root_pipeline->Ready();
// 将CTE pipelines也准备好
for (auto &rec_cte_ref : recursive_ctes) {
auto &rec_cte = rec_cte_ref.get().Cast<PhysicalRecursiveCTE>();
rec_cte.recursive_meta_pipeline->Ready();
}
// 这里实际上就是获取了root meta pipline中的pipeline集合
root_pipeline->GetPipelines(root_pipelines, false);
root_pipeline_idx = 0;
// 收集meta pipeline,跳过了root meta pipeline。
// 实际在为了够构建pipeline初始化的这个以nullptr为sink的root meta pipeline里面内容是空的,所以这里会跳过root meta pipeline,将剩余meta pipeline作为实际需要调度的meta pipeline
vector<shared_ptr<MetaPipeline>> to_schedule;
root_pipeline->GetMetaPipelines(to_schedule, true, true);
// PipelineCompleteEvent的数量和meta pipelines的数量相等,这里需要设置一下
total_pipelines = to_schedule.size();
// 从root meta pipelines开始递归收集所有的pipelines用于进度条和校验
root_pipeline->GetPipelines(pipelines, true);
// 最终,进行校验和调度
VerifyPipelines();
ScheduleEvents(to_schedule);
}
}重点需要关注就是这个root pipeline的Build方法,这是将物理执行计划转为pipeline的核心方法。
初始化meta pipeline
PipelineBuildState state;
auto root_pipeline = make_shared_ptr<MetaPipeline>(*this, state, nullptr);
// 构造方法签名
MetaPipeline(Executor &executor, PipelineBuildState &state, optional_ptr<PhysicalOperator> sink,
MetaPipelineType type = MetaPipelineType::REGULAR);
// 构造方法实现
MetaPipeline::MetaPipeline(Executor &executor_p, PipelineBuildState &state_p, optional_ptr<PhysicalOperator> sink_p,
MetaPipelineType type_p)
: executor(executor_p), state(state_p), sink(sink_p), type(type_p), recursive_cte(false), next_batch_index(0) {
CreatePipeline();
}
// 初始化一个pipeline
Pipeline &MetaPipeline::CreatePipeline() {
pipelines.emplace_back(make_shared_ptr<Pipeline>(executor));
state.SetPipelineSink(*pipelines.back(), sink, next_batch_index++);
return *pipelines.back();
}在后续递归过程中,以这个 root meta pipeline作为起点进行填充,按照sink算子分割出pipeline并设置依赖关系。
Build过程
这里使用的pipelines.back()就是上面创建的虚拟pipeline,以此作为起点,开始物理计划树的遍历。
void MetaPipeline::Build(PhysicalOperator &op) {
D_ASSERT(pipelines.size() == 1);
D_ASSERT(children.empty());
op.BuildPipelines(*pipelines.back(), *this);
}来看物理算子的默认实现
void PhysicalOperator::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) {
op_state.reset();
auto &state = meta_pipeline.GetState();
// 非sink,并且没有chink,那么当前算子就可以作为当前pipeline的source
if (!IsSink() && children.empty()) {
// Operator is a source.
state.SetPipelineSource(current, *this);
return;
}
if (children.size() != 1) {
throw InternalException("Operator not supported in BuildPipelines");
}
// 当前算子是sink,那么这个sink可以作为当前pipline的source。并创建一个新的meta pipeline并开启新的递归。当前新的meta pipeline是当前meta pipeline的依赖
if (IsSink()) {
// Operator is a sink.
sink_state.reset();
// It becomes the data source of the current pipeline.
state.SetPipelineSource(current, *this);
// Create a new pipeline starting at the child.
auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, *this);
child_meta_pipeline.Build(children[0].get());
return;
}
// Recurse into the child.
// 非source,非sink就正常加入pipline当中
state.AddPipelineOperator(current, *this);
children[0].get().BuildPipelines(current, meta_pipeline);
}从上面的流程可以看出,以sink类型的算子作为pipeline的分割,递归物理计划并创建每个pipline并维护其依赖关系。 sink算子既是meta pipeline的sink,也是依赖这个meta pipeline的父meta pipeline的source。两者通过sink算子连接其依赖关系。
MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline ¤t, PhysicalOperator &op, MetaPipelineType type) {
// 创建child meta pipeline
children.push_back(make_shared_ptr<MetaPipeline>(executor, state, &op, type));
// 维护依赖关系
auto &child_meta_pipeline = *children.back().get();
// store the parent
child_meta_pipeline.parent = ¤t;
// child MetaPipeline must finish completely before this MetaPipeline can start
current.AddDependency(child_meta_pipeline.GetBasePipeline());
// child meta pipeline is part of the recursive CTE too
child_meta_pipeline.recursive_cte = recursive_cte;
return child_meta_pipeline;
}给定下面一个物理算子示例,构建出pipeline。
ProjectionOperator (Regular)
|
HashAggregateOperator (Sink)
|
FilterOperator (Regular)
|
HashJoinOperator (Sink)
/ \
/ \
(左子树) (右子树)
ProjectionOperator SortOperator (Sink)
| |
FilterOperator ProjectionOperator
| |
TableScanOperator HashJoinOperator (Sink)
(Source) / \
/ \
TableScanOperator FilterOperator
(Source) |
TableScanOperator
(Source)以sink算子为分割点,构建 pipeline并设置依赖关系
╔═══════════════════════════════════════════════════════════╗
║ MetaPipeline 0 ║
║ (REGULAR TYPE) ║
║ ┌─ Pipeline 0 ─────────────────────────────────┐ ║
║ ┆ ProjectionOperator (Regular) ┆ ║
║ ┆ | ┆ ║
║ ┆ ▼ ┆ ║
║ ┆ ╔══════════════════════════════╗ ┆ ║
║ └─▶║ HashAggregateOperator ║◀───────────┘ ║
║ ║ (Sink - Pipeline边界) ║ ║
║ ╚══════════════════════════════╝ ║
╚════════════════════▲═══════════════════════════════════════╝
│ 依赖关系: 等待MetaPipeline 1完成
╔═══════════════════════════════════════════════════════════╗
║ MetaPipeline 1 ║
║ (REGULAR TYPE) ║
║ ┌─ Pipeline 1 ─────┴─────────────────────────────┐ ║
║ ┆ FilterOperator (Regular) ┆ ║
║ ┆ | ┆ ║
║ ┆ ▼ ┆ ║
║ ┆ ╔══════════════════════════════╗ ┆ ║
║ └─▶║ HashJoinOperator ║◀─────────────┘ ║
║ ║ (Sink - Pipeline边界) ║ ║
║ ╚══════════════════════════════╝ ║
╚════════════════▲═══════════════▲═══════════════════════════╝
│ │ 依赖关系: 等待子MetaPipeline完成
╔════════════════════════════════╗ ╔═══════════════════════════════╗
║ MetaPipeline 2 ║ ║ MetaPipeline 3 ║
║ (REGULAR TYPE) ║ ║ (REGULAR TYPE) ║
║┌─ Pipeline 2 ─────────┐ ║ ║ ┌─ Pipeline 3 ──┐ ║
║┆ ProjectionOperator ┆ ║ ║ ┆ [] ┆ ║
║┆ | ┆ ║ ║ ┆ ▲ ┆ ║
║┆ ▼ ┆ ║ ║ ┆ │ ┆ ║
║┆ FilterOperator ┆ ║ ║ ┆ ╔═══════════════════╗ ┆ ║
║┆ | ┆ ║ ║ └▶║ SortOperator ║◀────┘ ║
║┆ ▼ ┆ ║ ║ ║ (Sink - 边界) ║ ║
║┆ ◉ TableScanOperator ┆ ║ ║ ╚═══════════════════╝ ║
║┆ (Source) ┆ ║ ╚═══════════════▲═══════════════╝
║└───────────────────────┘ ║ │ 依赖关系
╚═════════════════════════════════╝ ╔═══════════════════════════════╗
║ MetaPipeline 4 ║
║ (REGULAR TYPE) ║
║┌─ Pipeline 4 ─┴─────────────┐ ║
║┆ ProjectionOperator ┆ ║
║┆ | ┆ ║
║┆ ▼ ┆ ║
║┆ ╔══════════════════════════╗ ┆ ║
║└▶║ HashJoinOperator ║◀┘ ║
║ ║ (Sink - Pipeline边界) ║ ║
║ ╚══════════════════════════╝ ║
╚═══════════▲══════════▲═════════╝
│ │ 依赖关系
╔══════════════════════════════════════╗ ╔═══════════════════════╗
║ MetaPipeline 5 ║ ║ MetaPipeline 6 ║
║ (JOIN_BUILD TYPE) ║ ║ (REGULAR TYPE) ║
║ ┌─ Pipeline 5 ────────┐ ║ ║┌─ Pipeline 6 ──┐ ║
║ ┆ ◉ TableScanOperator ┆ ║ ║┆ FilterOperator ┆ ║
║ ┆ (Source) ┆ ║ ║┆ | ┆ ║
║ └─────────────────────┘ ║ ║┆ ▼ ┆ ║
╚══════════════════════════════════════╝ ║┆◉ TableScanOperator┆ ║
║┆ (Source) ┆ ║
║└────────────────┘ ║
╚═══════════════════════╝
MetaPipeline执行依赖图:
═══════════════════════
MetaPipeline 5 (JOIN_BUILD) ────┐
├─▶ MetaPipeline 4 ──▶ MetaPipeline 3 ──▶ MetaPipeline 1 ──▶ MetaPipeline 0
MetaPipeline 6 (REGULAR) ────┘ ▲ ▲
│ │
MetaPipeline 2 ──────────────┘
执行阶段详细说明:
══════════════════
阶段1: JOIN_BUILD阶段 (并行执行)
├─ MetaPipeline 5: 为右子树HashJoin构建哈希表
│ └─ Pipeline 5: TableScan → HashJoin Build
├─ MetaPipeline 6: 为右子树HashJoin提供探测数据
│ └─ Pipeline 6: TableScan → Filter → HashJoin Probe
└─ 内存协调: 统一管理右子树HashJoin的哈希表内存
阶段2: 右子树完成,数据流向排序
└─ MetaPipeline 4:
└─ Pipeline 4: Projection → Sort
阶段3: 排序完成,准备上层JOIN_BUILD
├─ MetaPipeline 2: 构建上层HashJoin哈希表
│ └─ Pipeline 2: TableScan → Projection → Filter → HashJoin Build
└─ MetaPipeline 3: 提供排序后的探测数据
└─ Pipeline 3: Sort → HashJoin Probe
阶段4: 上层Join完成,进行聚合
└─ MetaPipeline 1:
└─ Pipeline 1: Filter → HashAggregate
阶段5: 聚合完成,最终投影
└─ MetaPipeline 0:
└─ Pipeline 0: Projection
MetaPipeline特殊属性说明:
═══════════════════════
类型标识:
├─ REGULAR TYPE: 常规pipeline集合,按依赖顺序执行
└─ JOIN_BUILD TYPE: Join构建阶段,需要特殊的内存协调和执行同步
依赖管理:
├─ 层级依赖: 子MetaPipeline必须完全完成后父MetaPipeline才能开始
├─ 同步机制: JOIN_BUILD类型有特殊的Combine→PrepareFinalize→Finalize执行序列
└─ 内存协调: 共享sink的pipeline通过MetaPipeline统一管理内存使用
并行优化:
├─ 同级MetaPipeline可以并行执行 (如MetaPipeline 5和6)
├─ 不同MetaPipeline之间的依赖最小化
└─ JOIN_BUILD类型支持溢出到磁盘等高级内存管理策略Pipeline event驱动构建
物理算子在上一步构建pipeline过程中,使用MetaPipeline来管理pipeline的依赖关系,然后该对象同样作为调度的对象。
整个并行执行系统是通过事件驱动执行的。所以需要先讲pipeline转为事件驱动的模型
- 将MetaPipeline封装到ScheduleEventData中,进入调度流程
void Executor::ScheduleEvents(const vector<shared_ptr<MetaPipeline>> &meta_pipelines) {
ScheduleEventData event_data(meta_pipelines, events, true);
ScheduleEventsInternal(event_data);
}先从整体上来看一遍ScheduleEventsInternal的实现
- 为查询的所有的Pipeline创建整个执行过程中的事件。
- 处理好事件之间的依赖关系
- 校验事件图保证不存在循环依赖,这里只在DEBUG模式下开启
- 类似拓扑排序,从无依赖的事件开始调度。 结合代码来一步步深入上面的流程
event map初始化
遍历MetaPipeline,创建好其内部的pipeline的事件
void Executor::ScheduleEventsInternal(ScheduleEventData &event_data) {
...
// create all the required pipeline events
for (auto &meta_pipeline : event_data.meta_pipelines) {
SchedulePipeline(meta_pipeline, event_data);
}
....
}这一步的核心函数为SchedulePipeline
- 先为MetaPipeline的base pipeline(0号pipeline)创建好事件序列。每个pipeline有5个基础事件,统一放在
PipelineEventStack中,并按照Initialize → Event → PrepareFinish → Finish → Complete建立依赖关系- PipelineInitializeEvent
- PipelineEvent
- PipelinePrepareFinishEvent
- PipelineFinishEvent
- PipelineCompleteEvent
auto base_pipeline = meta_pipeline->GetBasePipeline();
auto base_initialize_event = make_shared_ptr<PipelineInitializeEvent>(base_pipeline);
auto base_event = make_shared_ptr<PipelineEvent>(base_pipeline);
auto base_prepare_finish_event = make_shared_ptr<PipelinePrepareFinishEvent>(base_pipeline);
auto base_finish_event = make_shared_ptr<PipelineFinishEvent>(base_pipeline);
auto base_complete_event =
make_shared_ptr<PipelineCompleteEvent>(base_pipeline->executor, event_data.initial_schedule);
PipelineEventStack base_stack(*base_initialize_event, *base_event, *base_prepare_finish_event, *base_finish_event,
*base_complete_event);
events.push_back(std::move(base_initialize_event));
events.push_back(std::move(base_event));
events.push_back(std::move(base_prepare_finish_event));
events.push_back(std::move(base_finish_event));
events.push_back(std::move(base_complete_event));
// dependencies: initialize -> event -> prepare finish -> finish -> complete
base_stack.pipeline_event.AddDependency(base_stack.pipeline_initialize_event);
base_stack.pipeline_prepare_finish_event.AddDependency(base_stack.pipeline_event);
base_stack.pipeline_finish_event.AddDependency(base_stack.pipeline_prepare_finish_event);
base_stack.pipeline_complete_event.AddDependency(base_stack.pipeline_finish_event);- 处理MetaPipeline中的其他pipeline的事件序列。这里有一个关键的概念,那就是finish group。下面的处理逻辑就是按照pipeline的finish event情况进行不同的处理。
- 共享finish event的组。这些pipeline共享一个finish event。组内所有的pipeline都完成之后才会触发finish event
- 独立finish event。pipeline有自己独立的finish event。
- 默认共享base pipeline的finish event。这是大多数情况,也就是说其实大多数情况下MetaPipeline都是一个共享base pipeline的finish event的组。
vector<shared_ptr<Pipeline>> pipelines;
meta_pipeline->GetPipelines(pipelines, false);
for (idx_t i = 1; i < pipelines.size(); i++) { // loop starts at 1 because 0 is the base pipeline
auto &pipeline = pipelines[i];
D_ASSERT(pipeline);
// create events/stack for this pipeline
auto pipeline_event = make_shared_ptr<PipelineEvent>(pipeline);
auto finish_group = meta_pipeline->GetFinishGroup(*pipeline);
if (finish_group) {
// this pipeline is part of a finish group
const auto group_entry = event_map.find(*finish_group.get());
D_ASSERT(group_entry != event_map.end());
auto &group_stack = group_entry->second;
// finish group中的pipeline共享一组的finish event
PipelineEventStack pipeline_stack(base_stack.pipeline_initialize_event, *pipeline_event,
group_stack.pipeline_prepare_finish_event,
group_stack.pipeline_finish_event, base_stack.pipeline_complete_event);
// dependencies: base_finish -> pipeline_event -> group_prepare_finish
pipeline_stack.pipeline_event.AddDependency(base_stack.pipeline_finish_event);
group_stack.pipeline_prepare_finish_event.AddDependency(pipeline_stack.pipeline_event);
// add pipeline stack to event map
event_map.insert(make_pair(reference<Pipeline>(*pipeline), pipeline_stack));
} else if (meta_pipeline->HasFinishEvent(*pipeline)) {
// this pipeline has its own finish event (despite going into the same sink - Finalize twice!)
// 有独立finish event的pipeline
auto pipeline_prepare_finish_event = make_shared_ptr<PipelinePrepareFinishEvent>(pipeline);
auto pipeline_finish_event = make_shared_ptr<PipelineFinishEvent>(pipeline);
PipelineEventStack pipeline_stack(base_stack.pipeline_initialize_event, *pipeline_event,
*pipeline_prepare_finish_event, *pipeline_finish_event,
base_stack.pipeline_complete_event);
events.push_back(std::move(pipeline_prepare_finish_event));
events.push_back(std::move(pipeline_finish_event));
// dependencies:
// base_finish -> pipeline_event -> pipeline_prepare_finish -> pipeline_finish -> base_complete
pipeline_stack.pipeline_event.AddDependency(base_stack.pipeline_finish_event);
pipeline_stack.pipeline_prepare_finish_event.AddDependency(pipeline_stack.pipeline_event);
pipeline_stack.pipeline_finish_event.AddDependency(pipeline_stack.pipeline_prepare_finish_event);
base_stack.pipeline_complete_event.AddDependency(pipeline_stack.pipeline_finish_event);
// add pipeline stack to event map
event_map.insert(make_pair(reference<Pipeline>(*pipeline), pipeline_stack));
} else {
// 共享base pipeline的finish event
PipelineEventStack pipeline_stack(base_stack.pipeline_initialize_event, *pipeline_event,
base_stack.pipeline_prepare_finish_event,
base_stack.pipeline_finish_event, base_stack.pipeline_complete_event);
// dependencies: base_initialize -> pipeline_event -> base_prepare_finish
pipeline_stack.pipeline_event.AddDependency(base_stack.pipeline_initialize_event);
base_stack.pipeline_prepare_finish_event.AddDependency(pipeline_stack.pipeline_event);
// add pipeline stack to event map
event_map.insert(make_pair(reference<Pipeline>(*pipeline), pipeline_stack));
}
events.push_back(std::move(pipeline_event));
}从上面的代码当中可以看到。每个Pipeline都有自己的pipeline event,并且MetaPipeline中的所有pipeline共享base pipeline的initialize event和complete event。finish event按照上面的三个场景对应使用。 3. 特殊处理一下TableScan算子。一些算子需要在调度的时候立即初始化来避免延迟初始化带来的性能问题
for (auto &pipeline : pipelines) {
auto source = pipeline->GetSource();
if (source->type == PhysicalOperatorType::TABLE_SCAN) {
auto &table_function = source->Cast<PhysicalTableScan>();
if (table_function.function.global_initialization == TableFunctionInitialization::INITIALIZE_ON_SCHEDULE) {
// certain functions have to be eagerly initialized during scheduling
// if that is the case - initialize the function here
pipeline->ResetSource(true);
}
}
}那么到此为止,遍历完每一个MetaPipeline之后,所有的Pipeline的PipelineEventStack都创建好了,并设置了内部的依赖链。接下来就是设置好pipeline之间event之间的依赖关系了。
pipeline间event依赖构建
pipeline之间存在依赖关系,对应的这些事件也要存在依赖关系。
这个构建过程也分为几步
- 处理pipeline之间complete event依赖。当前pipeline必须等待依赖的pipeline的complete event
auto &event_map = event_data.event_map;
for (auto &entry : event_map) {
auto &pipeline = entry.first.get();
for (auto &dependency : pipeline.dependencies) {
auto dep = dependency.lock();
D_ASSERT(dep);
auto event_map_entry = event_map.find(*dep);
if (event_map_entry == event_map.end()) {
continue;
}
D_ASSERT(event_map_entry != event_map.end());
auto &dep_entry = event_map_entry->second;
entry.second.pipeline_event.AddDependency(dep_entry.pipeline_complete_event);
}
}- 建立同一MetaPipeline的pipeline之间的pipeline event依赖
for (auto &meta_pipeline : event_data.meta_pipelines) {
for (auto &entry : meta_pipeline->GetDependencies()) {
auto &pipeline = entry.first.get();
auto root_entry = event_map.find(pipeline);
D_ASSERT(root_entry != event_map.end());
auto &pipeline_stack = root_entry->second;
for (auto &dependency : entry.second) {
auto event_entry = event_map.find(dependency);
D_ASSERT(event_entry != event_map.end());
auto &dependency_stack = event_entry->second;
pipeline_stack.pipeline_event.AddDependency(dependency_stack.pipeline_event);
}
}
}- 处理特殊算子,例如HASH_BUILD
for (auto &meta_pipeline : event_data.meta_pipelines) {
vector<shared_ptr<MetaPipeline>> children;
meta_pipeline->GetMetaPipelines(children, false, true);
for (auto &child1 : children) {
if (child1->Type() != MetaPipelineType::JOIN_BUILD) {
continue; // We only want to do this for join builds
}
auto &child1_base = *child1->GetBasePipeline();
auto child1_entry = event_map.find(child1_base);
D_ASSERT(child1_entry != event_map.end());
for (auto &child2 : children) {
if (child2->Type() != MetaPipelineType::JOIN_BUILD || RefersToSameObject(*child1, *child2)) {
continue; // We don't want to depend on itself
}
if (!RefersToSameObject(*child1->GetParent(), *child2->GetParent())) {
continue; // Different parents, skip
}
auto &child2_base = *child2->GetBasePipeline();
auto child2_entry = event_map.find(child2_base);
D_ASSERT(child2_entry != event_map.end());
// all children PrepareFinalize must wait until all Combine
child1_entry->second.pipeline_prepare_finish_event.AddDependency(child2_entry->second.pipeline_event);
// all children Finalize must wait until all PrepareFinalize
child1_entry->second.pipeline_finish_event.AddDependency(
child2_entry->second.pipeline_prepare_finish_event);
}
}
}检查是否存在循环依赖
在DEBUG模式下会判断是否存在循环依赖
启动event
从没有任何依赖的event开始启动调度。
for (auto &event : events) {
if (!event->HasDependencies()) {
event->Schedule();
}
}示例
查询: SELECT * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON t2.id = t3.id
物理算子树:
ResultCollector
|
HashJoin2 (Probe)
/ \
HashJoin1 TableScan(t3)
/ \ |
TableScan TableScan Build3
(t1) (t2)
|
Build2
MetaPipeline层次结构:
═══════════════════════════════════════════════════════════════════════════
┌─ Root MetaPipeline (REGULAR) ───────────────────────────────────────────┐
│ │
│ Pipeline R1: HashJoin1 → HashJoin2(Probe) → ResultCollector │
│ │
│ R1 Events: │
│ ┌─ R1_Initialize ──▶ R1_Event ──▶ R1_PrepFinish ──▶ R1_Finish ──▶ R1_Complete
│ ▲ │
│ │ 依赖 │
└───────────────────────┼──────────────────────────────────────────────────┘
│ R1_Event 依赖 M2_Complete
│
┌─ Level1 MetaPipeline (REGULAR) ────────────────────────────────────────┐
│ │
│ Pipeline M1: TableScan(t1) → HashJoin1(Probe) │
│ │
│ M1 Events: │
│ ┌─ M1_Initialize ──▶ M1_Event ──▶ M1_PrepFinish ──▶ M1_Finish ──▶ M1_Complete
│ ▲ │
│ │ 依赖 │
└───────────────────────┼──────────────────────────────────────────────────┘
│ M1_Event 依赖 C1_Complete
│
┌─ Level2 MetaPipeline (JOIN_BUILD) ─────────────────────────────────────┐
│ │
│ Pipeline C1: TableScan(t2) → HashJoin1(Build) │
│ │
│ C1 Events: │
│ ┌─ C1_Initialize ──▶ C1_Event ──▶ C1_PrepFinish ──▶ C1_Finish ──▶ C1_Complete
│ │
└──────────────────────────────────────────────────────────────────────────┘
│
┌─ Level2 MetaPipeline (JOIN_BUILD) ─────────────────────────────────────┐
│ │
│ Pipeline M2: TableScan(t3) → HashJoin2(Build) │
│ │
│ M2 Events: │
│ ┌─ M2_Initialize ──▶ M2_Event ──▶ M2_PrepFinish ──▶ M2_Finish ──▶ M2_Complete
│ │
└──────────────────────────────────────────────────────────────────────────┘
完整的事件依赖关系:
═══════════════════════════════════════════════════════════════════════════
层级依赖 (跨MetaPipeline的Complete依赖):
M1_Event 依赖 C1_Complete (第一个JOIN的Build→Probe)
R1_Event 依赖 M2_Complete (第二个JOIN的Build→Probe)
执行时序图:
时间 ────────────────────────────────────────────────────────────────────▶
C1: Init ──▶ Event ──▶ PrepFinish ──▶ Finish ──▶ Complete
│
M2: Init ──▶ Event ──▶ PrepFinish ──▶ Finish ──▶ Complete
│
▼
M1: Init ──▶ Event ──▶ PrepFinish ──▶ Finish ──▶ Complete
│
▼
R1: Init ──▶ Event ──▶ ...
说明:
1. C1和M2可以并行执行 (不同的JOIN Build阶段)
2. M1必须等待C1完全完成 (第一个JOIN的依赖)
3. R1必须等待M2完全完成 (第二个JOIN的依赖)
4. 每个MetaPipeline内部遵循标准的5阶段生命周期Pipeline调度执行
没有依赖的event一定是没有依赖的pipeline的init event。那么就从此开始来了解event是如何驱动完成所有pipeline的执行的
PipelineInitializeEvent调度
Schedule方法做的事件很简单,就是创建出PipelineInitializeTask任务,然后把任务交给TaskScheduler进行调度。
void PipelineInitializeEvent::Schedule() {
// needs to spawn a task to get the chain of tasks for the query plan going
vector<shared_ptr<Task>> tasks;
tasks.push_back(make_uniq<PipelineInitializeTask>(*pipeline, shared_from_this()));
SetTasks(std::move(tasks));
}所作的事也是将任务提交到TaskScheduler的队列当中
void Event::SetTasks(vector<shared_ptr<Task>> tasks) {
auto &ts = TaskScheduler::GetScheduler(executor.context);
D_ASSERT(total_tasks == 0);
D_ASSERT(!tasks.empty());
this->total_tasks = tasks.size();
for (auto &task : tasks) {
ts.ScheduleTask(executor.GetToken(), std::move(task));
}
}
void TaskScheduler::ScheduleTask(ProducerToken &token, shared_ptr<Task> task) {
// Enqueue a task for the given producer token and signal any sleeping threads
queue->Enqueue(token, std::move(task));
}视角转到TaskScheduler,就是专门的线程负责从队列中取出执行并执行
void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
#ifndef DUCKDB_NO_THREADS
static constexpr const int64_t INITIAL_FLUSH_WAIT = 500000; // initial wait time of 0.5s (in mus) before flushing
auto &config = DBConfig::GetConfig(db);
shared_ptr<Task> task;
// loop until the marker is set to false
while (*marker) {
// 忽略阻塞
// ......
if (queue->q.try_dequeue(task)) {
auto process_mode = config.options.scheduler_process_partial ? TaskExecutionMode::PROCESS_PARTIAL
: TaskExecutionMode::PROCESS_ALL;
// 执行任务
auto execute_result = task->Execute(process_mode);
// 根据执行结果来决定如何执行下一步调度
switch (execute_result) {
case TaskExecutionResult::TASK_FINISHED:
case TaskExecutionResult::TASK_ERROR:
task.reset();
break;
case TaskExecutionResult::TASK_NOT_FINISHED: {
// task is not finished - reschedule immediately
auto &token = *task->token;
queue->Enqueue(token, std::move(task));
break;
}
case TaskExecutionResult::TASK_BLOCKED:
task->Deschedule();
task.reset();
break;
}
}
}
// ......
}就是调用task的Execute方法,在ExecutorTask有个统一的实现
do {
TaskNotifier task_notifier {context};
thread_context->profiler.StartOperator(op);
// to allow continuous profiling, always execute in small steps
result = ExecuteTask(TaskExecutionMode::PROCESS_PARTIAL);
thread_context->profiler.EndOperator(nullptr);
executor.Flush(*thread_context);
} while (mode == TaskExecutionMode::PROCESS_ALL && result == TaskExecutionResult::TASK_NOT_FINISHED);
return result;具体的逻辑由每个子类自己实现的ExecuteTask方法决定,对于PipelineInitializeTask就是初始化一些状态,并开启下一轮调度
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
pipeline.ResetSink();
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}在event->FinishTask中会更新事件的状态,将依赖本event的其他event尝试启动
void Event::FinishTask() {
D_ASSERT(finished_tasks.load() < total_tasks.load());
idx_t current_tasks = total_tasks;
idx_t current_finished = ++finished_tasks;
D_ASSERT(current_finished <= current_tasks);
if (current_finished == current_tasks) {
// 本event的所有task都结束,走finish流程
Finish();
}
}
void Event::Finish() {
D_ASSERT(!finished);
FinishEvent();
finished = true;
// finished processing the pipeline, now we can schedule pipelines that depend on this pipeline
for (auto &parent_entry : parents) {
auto parent = parent_entry.lock();
if (!parent) { // LCOV_EXCL_START
continue;
} // LCOV_EXCL_STOP
// 更新依赖本event的parent event
// mark a dependency as completed for each of the parents
parent->CompleteDependency();
}
FinalizeFinish();
}
void Event::CompleteDependency() {
idx_t current_finished = ++finished_dependencies;
D_ASSERT(current_finished <= total_dependencies);
if (current_finished == total_dependencies) {
// all dependencies have been completed: schedule the event
// 所有依赖的事件都已经完成,开始调度本event
D_ASSERT(total_tasks == 0);
Schedule();
if (total_tasks == 0) {
Finish();
}
}
}那么从到了这里,从PipelineInitializeEvent的调度流程当中,我们就可以看出整体的调度情况
- 有event创建对应的Task,提交到TaskSchduler队列当中,交由调度线程处理
- 每个Task结束更新event的状态,最终event finish,更新parent event的状态并启动下一个event的调度
- 循环上述过程,直到所有的event调度执行结束。那么这个查询任务也执行完成了。
不过由于这里只是一个初始化的event,所以看不出什么执行计算的逻辑,继续看后续event的调度。
PipelineEvent调度
在PipelineEvent就涉及到pipeline的执行了。直接调用pipline的Schedule方法
void Pipeline::Schedule(shared_ptr<Event> &event) {
D_ASSERT(ready);
D_ASSERT(sink);
Reset();
if (!ScheduleParallel(event)) {
// could not parallelize this pipeline: push a sequential task instead
ScheduleSequentialTask(event);
}
}pipeline支持并行就并行执行,不支持就串行执行。两者的本质都是创建PipelineTask任务提交给TaskScheduler队列,只是并行执行可以创建多个任务同时执行
void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) {
vector<shared_ptr<Task>> tasks;
tasks.push_back(make_uniq<PipelineTask>(*this, event));
event->SetTasks(std::move(tasks));
}后续的流程在前面已经重复了就不再赘述了,直接看PipelineTask的实现逻辑即可
TaskExecutionResult PipelineTask::ExecuteTask(TaskExecutionMode mode) {
if (!pipeline_executor) {
pipeline_executor = make_uniq<PipelineExecutor>(pipeline.GetClientContext(), pipeline);
}
pipeline_executor->SetTaskForInterrupts(shared_from_this());
if (mode == TaskExecutionMode::PROCESS_PARTIAL) {
auto res = pipeline_executor->Execute(PARTIAL_CHUNK_COUNT);
switch (res) {
case PipelineExecuteResult::NOT_FINISHED:
return TaskExecutionResult::TASK_NOT_FINISHED;
case PipelineExecuteResult::INTERRUPTED:
return TaskExecutionResult::TASK_BLOCKED;
case PipelineExecuteResult::FINISHED:
break;
}
} else {
auto res = pipeline_executor->Execute();
switch (res) {
case PipelineExecuteResult::NOT_FINISHED:
throw InternalException("Execute without limit should not return NOT_FINISHED");
case PipelineExecuteResult::INTERRUPTED:
return TaskExecutionResult::TASK_BLOCKED;
case PipelineExecuteResult::FINISHED:
break;
}
}
event->FinishTask();
pipeline_executor.reset();
return TaskExecutionResult::TASK_FINISHED;
}pipeline的执行,按照传入的执行模式分为两种
- 执行一部分数据
- 执行全部数据
最关键的是,pipeline的执行逻辑是构造PipelineExecutor对象并负责执行的。这个不再这里深入介绍,总体的逻辑就是遍历pipeline中的算子列表,调用其Execute方法,计算出数据chunk,发送到sink。
执行结束之后任务就转为Finish,继续后续event的调度了。该event到此结束
PipelinePrepareFinishEvent/PipelineFinishEvent/PipelineCompleteEvent
实际上经过前两个event的调度执行学习之后,后面三个就不需要再反复介绍了。同样还是创建对应的Task,交由调度器执行结束之后触发后续的event的调度。
不过有一点需要注意,那就是依赖本pipeline的上游pipeline,是在上面的pipeline event事件结束之后开始调度的。这里的三个event更多是用来管理pipeline的资源和追踪执行情况的。
现在任务整体的执行流程是比较明确的了,有关具体的调度器、执行器的逻辑后续单开一节进行说明。