经过Parallel:并行系统了解,现在可以确定的点是,duckdb任务的执行是event驱动的,有event维护依赖关系,并转为task发送给TaskScheduler进行调度执行。

TaskScheduler对于duckdb充分利用计算资源,进行高性能计算非常重要。因此在这里单开一节进行详细说明。

先给出结论,TaskScheduler具有如下优点:

  1. 高性能:基于moodycamel无锁队列并发队列和轻量信号量实现的高性能任务队列
  2. 动态扩展:线程池支持可以动态扩展
  3. numa感知:基于生产者令牌机制,可以让同一生产者的任务在一个线程上按照FIFO执行,不同生产者之间并行执行。
  4. 内存友好:对于numa集成的内存管理优化

初始化

在duckdb整个DatabaseInstace初始化的时候,就创建了TaskScheduler对象,并设置线程数和启动线程。

scheduler = make_uniq<TaskScheduler>(*this);
// 这里将线程分为了总线程和外部线程,因为duckdb是一个嵌入式的数据库,外部线程指的就是嵌入了duckdb的进程,方便从整体上控制duckdb的线程资源使用
scheduler->SetThreads(config.options.maximum_threads, config.options.external_threads);
scheduler->RelaunchThreads();

对应的在TaskScheduler中提供了两个静态方法,以便在需要使用TaskScheduler的地方可以使用统一的方法获取

 
DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context);
DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db);

直接看构造方法

TaskScheduler::TaskScheduler(DatabaseInstance &db)
    : db(db), queue(make_uniq<ConcurrentQueue>()),
      allocator_flush_threshold(db.config.options.allocator_flush_threshold),
      allocator_background_threads(db.config.options.allocator_background_threads), requested_thread_count(0),
      current_thread_count(1) {
	SetAllocatorBackgroundThreads(db.config.options.allocator_background_threads);
}

也就是创建和设置一些字段值,不过这里有两个我们特别需要注意的地方

  1. 高性能并发队列的创建:高性能的保证
  2. 后台分配器线程:将设计到阻塞、IO的部分交由这些线程异步处理,让调度器专注于计算。 这两者的详细介绍会在本节的附录部分进行说明。这里跳过,继续关注TaskScheduler本身的逻辑。

启动

按照设置好的线程数启动线程

void TaskScheduler::RelaunchThreads() {
	lock_guard<mutex> t(thread_lock);
	auto n = requested_thread_count.load();
	RelaunchThreadsInternal(n);
}

duckdb支持外部动态修改线程数,每次修改之后都需要调用此函数来重新启动线程。

完整的启动逻辑如下

  1. 获取新的调度线程数
  2. 若新的调度线程数与现在的已经启动的线程数相同,则直接返回,否则执行下一步3。
  3. 要收缩线程,则先将当前的所有线程关闭掉。然后接着走下一步的创建新线程。要是本来就是增加线程就会直接走下一步。
  4. 计算出需要新增的线程数,然后创建线程
  5. 刷新一下分配器
void TaskScheduler::RelaunchThreadsInternal(int32_t n) {
#ifndef DUCKDB_NO_THREADS
	auto &config = DBConfig::GetConfig(db);
	auto new_thread_count = NumericCast<idx_t>(n);
	if (threads.size() == new_thread_count) {
		current_thread_count = NumericCast<int32_t>(threads.size() + config.options.external_threads);
		return;
	}
	// 收缩线程,直接关闭现在所有的线程。然后再创建新的线程数的线程
	if (threads.size() > new_thread_count) {
		// we are reducing the number of threads: clear all threads first
		for (idx_t i = 0; i < threads.size(); i++) {
			*markers[i] = false;
		}
		Signal(threads.size());
		// now join the threads to ensure they are fully stopped before erasing them
		for (idx_t i = 0; i < threads.size(); i++) {
			threads[i]->internal_thread->join();
		}
		// erase the threads/markers
		threads.clear();
		markers.clear();
	}
	// 增加线程
	if (threads.size() < new_thread_count) {
		// we are increasing the number of threads: launch them and run tasks on them
		idx_t create_new_threads = new_thread_count - threads.size();
		for (idx_t i = 0; i < create_new_threads; i++) {
			// launch a thread and assign it a cancellation marker
			auto marker = unique_ptr<atomic<bool>>(new atomic<bool>(true));
			unique_ptr<thread> worker_thread;
			try {
				worker_thread = make_uniq<thread>(ThreadExecuteTasks, this, marker.get());
			} catch (std::exception &ex) {
				// thread constructor failed - this can happen when the system has too many threads allocated
				// in this case we cannot allocate more threads - stop launching them
				break;
			}
			auto thread_wrapper = make_uniq<SchedulerThread>(std::move(worker_thread));
 
			threads.push_back(std::move(thread_wrapper));
			markers.push_back(std::move(marker));
		}
	}
	current_thread_count = NumericCast<int32_t>(threads.size() + config.options.external_threads);
	if (Allocator::SupportsFlush()) {
		Allocator::FlushAll();
	}
#endif
}

执行

TaskScheduler的执行逻辑倒是非常明显。每个Task可以看作做一个数据分片的一次计算操作。每完成一次执行,就按照任务的执行的结果来决定如何对其进行下一步的调度。

不过除了执行任务之外,duckdb还在这里利用每次执行完任务之后,线程的空闲时间碎片来对分配器进行刷新。实现智能内存管理的功能,避免内存碎片积累和长期占用。

void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
#ifndef DUCKDB_NO_THREADS
	static constexpr const int64_t INITIAL_FLUSH_WAIT = 500000; // initial wait time of 0.5s (in mus) before flushing
 
	auto &config = DBConfig::GetConfig(db);
	shared_ptr<Task> task;
	// marker用来外部控制线程关闭的
	while (*marker) {
		// 智能内存清理策略
		if (!Allocator::SupportsFlush()) {
			// 1. 不支持刷新,就不需要清理,等待有任务然后执行即可
			queue->semaphore.wait();
		} else if (!queue->semaphore.wait(INITIAL_FLUSH_WAIT)) {
			// 2. 内存支持刷新,等了0.5s也没有任务到来,说明该线程空闲了0.5s,按照配置的值进行内存刷新
			Allocator::ThreadFlush(allocator_background_threads, allocator_flush_threshold,
			                       NumericCast<idx_t>(requested_thread_count.load()));
			// 3.上面是轻度清理,清理明显废弃的内存。清理之后再判断一下是否要进行深度内存清理。
			auto decay_delay = Allocator::DecayDelay();
			if (!decay_delay.IsValid()) {
				// 3.1 不需要进行深度清理,则进入无限等待,直到有任务
				queue->semaphore.wait();
			} else {
				// 3.2 需要深度清理,并且在decay_delay时间内该线程一直处于空闲,那么就准备执行深度清理。清理之后进入无限等待,直到有任务到来
				if (!queue->semaphore.wait(UnsafeNumericCast<int64_t>(decay_delay.GetIndex()) * 1000000 -
				                           INITIAL_FLUSH_WAIT)) {
					Allocator::ThreadIdle();
					queue->semaphore.wait();
				}
			}
		}
		// 从队列当中取出任务并执行
		if (queue->q.try_dequeue(task)) {
			auto process_mode = config.options.scheduler_process_partial ? TaskExecutionMode::PROCESS_PARTIAL
			                                                             : TaskExecutionMode::PROCESS_ALL;
			auto execute_result = task->Execute(process_mode);
 
			// 按照执行的结果来决定下一步的调度
			switch (execute_result) {
			case TaskExecutionResult::TASK_FINISHED:
			case TaskExecutionResult::TASK_ERROR:
				task.reset();
				break;
			case TaskExecutionResult::TASK_NOT_FINISHED: {
				// task is not finished - reschedule immediately
				auto &token = *task->token;
				queue->Enqueue(token, std::move(task));
				break;
			}
			case TaskExecutionResult::TASK_BLOCKED:
				task->Deschedule();
				task.reset();
				break;
			}
		}
	}
	// 线程退出之前也做一下内存清理
	if (Allocator::SupportsFlush()) {
		Allocator::ThreadFlush(allocator_background_threads, 0, NumericCast<idx_t>(requested_thread_count.load()));
		Allocator::ThreadIdle();
	}
#else
	throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed.");
#endif
}

TaskScheduler本身的逻辑就这样了。有关其高性能的部分,在下面的附录的并发队列、分配器中再进行说明。

优化

TaskScheduler中有一个GetEstimatedCPUId,可以获取到当前线程所属的cpu号,根据这个信息,可以做很多优化。 查看调用TaskScheduler::GetEstimatedCPUId的位置,主要可以发现以下的优化

  1. 内存使用统计缓存优化。每个线程更新内存使用情况的时候会根据cpu id选择不同的slot,减少原子操作的竞争
  2. 临时文件压缩自适应优化。压缩算法自适应策略需要记录性能指标,多线程共享会造成竞争。为每个cpu id维护独立的压缩自适应状态。

但是有关NUMA优化的部分仍然没有,duckdb还是使用的全局内存管理。cpu id目前还是主要用来解决共享状态的竞争问题。