经过Parallel:并行系统了解,现在可以确定的点是,duckdb任务的执行是event驱动的,有event维护依赖关系,并转为task发送给TaskScheduler进行调度执行。
而TaskScheduler对于duckdb充分利用计算资源,进行高性能计算非常重要。因此在这里单开一节进行详细说明。
先给出结论,TaskScheduler具有如下优点:
- 高性能:基于moodycamel无锁队列并发队列和轻量信号量实现的高性能任务队列
- 动态扩展:线程池支持可以动态扩展
- numa感知:基于生产者令牌机制,可以让同一生产者的任务在一个线程上按照FIFO执行,不同生产者之间并行执行。
- 内存友好:对于numa集成的内存管理优化
初始化
在duckdb整个DatabaseInstace初始化的时候,就创建了TaskScheduler对象,并设置线程数和启动线程。
scheduler = make_uniq<TaskScheduler>(*this);
// 这里将线程分为了总线程和外部线程,因为duckdb是一个嵌入式的数据库,外部线程指的就是嵌入了duckdb的进程,方便从整体上控制duckdb的线程资源使用
scheduler->SetThreads(config.options.maximum_threads, config.options.external_threads);
scheduler->RelaunchThreads();对应的在TaskScheduler中提供了两个静态方法,以便在需要使用TaskScheduler的地方可以使用统一的方法获取
DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context);
DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db);直接看构造方法
TaskScheduler::TaskScheduler(DatabaseInstance &db)
: db(db), queue(make_uniq<ConcurrentQueue>()),
allocator_flush_threshold(db.config.options.allocator_flush_threshold),
allocator_background_threads(db.config.options.allocator_background_threads), requested_thread_count(0),
current_thread_count(1) {
SetAllocatorBackgroundThreads(db.config.options.allocator_background_threads);
}也就是创建和设置一些字段值,不过这里有两个我们特别需要注意的地方
- 高性能并发队列的创建:高性能的保证
- 后台分配器线程:将设计到阻塞、IO的部分交由这些线程异步处理,让调度器专注于计算。
这两者的详细介绍会在本节的附录部分进行说明。这里跳过,继续关注
TaskScheduler本身的逻辑。
启动
按照设置好的线程数启动线程
void TaskScheduler::RelaunchThreads() {
lock_guard<mutex> t(thread_lock);
auto n = requested_thread_count.load();
RelaunchThreadsInternal(n);
}duckdb支持外部动态修改线程数,每次修改之后都需要调用此函数来重新启动线程。
完整的启动逻辑如下
- 获取新的调度线程数
- 若新的调度线程数与现在的已经启动的线程数相同,则直接返回,否则执行下一步3。
- 要收缩线程,则先将当前的所有线程关闭掉。然后接着走下一步的创建新线程。要是本来就是增加线程就会直接走下一步。
- 计算出需要新增的线程数,然后创建线程
- 刷新一下分配器
void TaskScheduler::RelaunchThreadsInternal(int32_t n) {
#ifndef DUCKDB_NO_THREADS
auto &config = DBConfig::GetConfig(db);
auto new_thread_count = NumericCast<idx_t>(n);
if (threads.size() == new_thread_count) {
current_thread_count = NumericCast<int32_t>(threads.size() + config.options.external_threads);
return;
}
// 收缩线程,直接关闭现在所有的线程。然后再创建新的线程数的线程
if (threads.size() > new_thread_count) {
// we are reducing the number of threads: clear all threads first
for (idx_t i = 0; i < threads.size(); i++) {
*markers[i] = false;
}
Signal(threads.size());
// now join the threads to ensure they are fully stopped before erasing them
for (idx_t i = 0; i < threads.size(); i++) {
threads[i]->internal_thread->join();
}
// erase the threads/markers
threads.clear();
markers.clear();
}
// 增加线程
if (threads.size() < new_thread_count) {
// we are increasing the number of threads: launch them and run tasks on them
idx_t create_new_threads = new_thread_count - threads.size();
for (idx_t i = 0; i < create_new_threads; i++) {
// launch a thread and assign it a cancellation marker
auto marker = unique_ptr<atomic<bool>>(new atomic<bool>(true));
unique_ptr<thread> worker_thread;
try {
worker_thread = make_uniq<thread>(ThreadExecuteTasks, this, marker.get());
} catch (std::exception &ex) {
// thread constructor failed - this can happen when the system has too many threads allocated
// in this case we cannot allocate more threads - stop launching them
break;
}
auto thread_wrapper = make_uniq<SchedulerThread>(std::move(worker_thread));
threads.push_back(std::move(thread_wrapper));
markers.push_back(std::move(marker));
}
}
current_thread_count = NumericCast<int32_t>(threads.size() + config.options.external_threads);
if (Allocator::SupportsFlush()) {
Allocator::FlushAll();
}
#endif
}执行
TaskScheduler的执行逻辑倒是非常明显。每个Task可以看作做一个数据分片的一次计算操作。每完成一次执行,就按照任务的执行的结果来决定如何对其进行下一步的调度。
不过除了执行任务之外,duckdb还在这里利用每次执行完任务之后,线程的空闲时间碎片来对分配器进行刷新。实现智能内存管理的功能,避免内存碎片积累和长期占用。
void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
#ifndef DUCKDB_NO_THREADS
static constexpr const int64_t INITIAL_FLUSH_WAIT = 500000; // initial wait time of 0.5s (in mus) before flushing
auto &config = DBConfig::GetConfig(db);
shared_ptr<Task> task;
// marker用来外部控制线程关闭的
while (*marker) {
// 智能内存清理策略
if (!Allocator::SupportsFlush()) {
// 1. 不支持刷新,就不需要清理,等待有任务然后执行即可
queue->semaphore.wait();
} else if (!queue->semaphore.wait(INITIAL_FLUSH_WAIT)) {
// 2. 内存支持刷新,等了0.5s也没有任务到来,说明该线程空闲了0.5s,按照配置的值进行内存刷新
Allocator::ThreadFlush(allocator_background_threads, allocator_flush_threshold,
NumericCast<idx_t>(requested_thread_count.load()));
// 3.上面是轻度清理,清理明显废弃的内存。清理之后再判断一下是否要进行深度内存清理。
auto decay_delay = Allocator::DecayDelay();
if (!decay_delay.IsValid()) {
// 3.1 不需要进行深度清理,则进入无限等待,直到有任务
queue->semaphore.wait();
} else {
// 3.2 需要深度清理,并且在decay_delay时间内该线程一直处于空闲,那么就准备执行深度清理。清理之后进入无限等待,直到有任务到来
if (!queue->semaphore.wait(UnsafeNumericCast<int64_t>(decay_delay.GetIndex()) * 1000000 -
INITIAL_FLUSH_WAIT)) {
Allocator::ThreadIdle();
queue->semaphore.wait();
}
}
}
// 从队列当中取出任务并执行
if (queue->q.try_dequeue(task)) {
auto process_mode = config.options.scheduler_process_partial ? TaskExecutionMode::PROCESS_PARTIAL
: TaskExecutionMode::PROCESS_ALL;
auto execute_result = task->Execute(process_mode);
// 按照执行的结果来决定下一步的调度
switch (execute_result) {
case TaskExecutionResult::TASK_FINISHED:
case TaskExecutionResult::TASK_ERROR:
task.reset();
break;
case TaskExecutionResult::TASK_NOT_FINISHED: {
// task is not finished - reschedule immediately
auto &token = *task->token;
queue->Enqueue(token, std::move(task));
break;
}
case TaskExecutionResult::TASK_BLOCKED:
task->Deschedule();
task.reset();
break;
}
}
}
// 线程退出之前也做一下内存清理
if (Allocator::SupportsFlush()) {
Allocator::ThreadFlush(allocator_background_threads, 0, NumericCast<idx_t>(requested_thread_count.load()));
Allocator::ThreadIdle();
}
#else
throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed.");
#endif
}TaskScheduler本身的逻辑就这样了。有关其高性能的部分,在下面的附录的并发队列、分配器中再进行说明。
优化
TaskScheduler中有一个GetEstimatedCPUId,可以获取到当前线程所属的cpu号,根据这个信息,可以做很多优化。
查看调用TaskScheduler::GetEstimatedCPUId的位置,主要可以发现以下的优化
- 内存使用统计缓存优化。每个线程更新内存使用情况的时候会根据cpu id选择不同的slot,减少原子操作的竞争
- 临时文件压缩自适应优化。压缩算法自适应策略需要记录性能指标,多线程共享会造成竞争。为每个cpu id维护独立的压缩自适应状态。
但是有关NUMA优化的部分仍然没有,duckdb还是使用的全局内存管理。cpu id目前还是主要用来解决共享状态的竞争问题。