概述

  1. 协程是一种可以挂起和恢复的函数
  2. c++中的协程是无栈式的,也就是使用的是调用者的线程栈,恢复执行所需要的数据也存在堆上而不是栈上。

借助协程可以使得顺序执行的代码异步执行,比如在不需要显式回调的情况下处理非阻塞I/O。

在函数的定义当中包含下面语句的就可以认为该函数是一个协程

  • co_await expression:suspend当前协程直到被resumed
task<> tcp_echo_server()
{
	char data[1024];
	while (true)
	{
		// 让出当前协程执行async_read_some,等待执行结束后resumed
		std::size_t n = co_await socket.async_read_some(buffer(data));
		// 让出当前协程执行async_write,等待被resumed
		co_await async_write(socket, buffer(data, n));
	}
}
  • co_yield expression:suspend当前协程并返回一个值
generator<unsigned int> iota(unsigned int n = 0) 
{
	while (true)
		co_yield n++;
}
  • co_return expression:结束当前协程的执行并返回一个值
lazy<int> f()
{
	co_return 7;
}

约束

  • 协程不能使用可变参数,不能使用普通的return语句,不能使用占位的返回类型(如auto, Concept)。
  • consteval函数、constexpr函数,构造函数、析构函数、main函数不能是协程

协程执行关键结构

  • promise type:在协程内部操作的对象。协程通过这个对象来提交返回值或者异常。需要注意的是promise对象是协程的一个概念,与标准库中的std::promise没有任何关系。
  • coroutine handle:在协程外部进行操作的对象(用于外部操作协程的)。用于resume协程执行或者销毁协程的frame。
  • coroutine state:这是一个内部的、动态分配的存储(除非被优化掉)对象,这个对象包含一下内容
    • promise object
    • 参数(全部都是值拷贝)
    • 当前挂起点的表示(比如寄存器等信息),resumed时知道从哪里继续,销毁的时候知道哪些局部变量处于作用域中
    • 生命周期跨越当前挂起点的局部变量和临时变量

其实,c++20的coroutinue就是在语言层面定义了一些类型和关键字,只要按照给定的规范编写协程,编译器就可以将其转换为对应的代码,从而完成协程的异步执行。

  • 由于coroutine是无栈协程,因此需要使用coroutine state 来保存执行的上下文信息,以便挂起恢复执行能够继续执行。
  • 在最开始就已经介绍过了,coroutine其实还是一个函数,因此仍然有返回值,就是通过promise type来设置的
  • coroutine挂起之后能够被恢复,就是通过coroutine handle来完成的,挂起的时候,将自己的句柄也一并提交给需要创建的awaitable操作,以便这个awaitable对象执行完成之后能够恢复这个协程的执行。 结合下面的执行流程或者就更加明晰了。

协程执行流程

协程的启动流程

一个coroutine的启动执行流程

  1. 使用new关键字申请coroutine state对象
  2. 将函数的所有的参数拷贝到coroutine state对象当中:按值传递的参数会被move或者copy,按引用传递的参数仍然保持引用传递(因此如果在所引用的对象生命周期结束之后恢复协程,那么它可能会成为一个悬垂引用)
  3. 调用promise对象的构造函数。如果该promise类型有一个接受所有coroutine参数的构造函数,那么调用该构造函数,并使用复制后的协程参数传参。否则就调用默认的构造函数
  4. 调用promise.get_return_object()并将结果保存在局部变量当中,当协程首次挂起时,该调用结果将返回给调用者。在此步骤及其之前抛出的任何异常都将传播会调用者,而不是放置在promise当中。
  5. 调用promise.initial_suspend()co_await结果。通常promise type要么返回std::suspend_always用于懒加载的协程,要么返回std::suspend_never用于立即启动的协程。
  6. co_await promise.initial_suspend()恢复,开始执行协程函数体

协程的挂起点

在coroutine函数体到达挂起点之后

  1. 之前promise.get_return_object()返回的对象会被隐式转换为协程的返回类型,返回给协程的调用者或者恢复者,继续调用者原本的控制流。(从控制流的角度来看和goto关键字类似,其实无非就是跳转指令)

协程的返回

coroutine执行到co_return语句之后 2. 返回类型为void,例如co_return;语句或者co_return expr;中的expr类型为void,则调用promise.return_void() 3. 非void返回类型,例如co_return expr;其中expr类型不为void。则调用promise.return_value(expr) 4. 按照创建的相反的顺序销毁自动创建的所有变量 5. 调用promise.final_suspend()co_await结果 coroutine执行结束离开等效于co_return;语句,但是如果在promise type中没有找到return_void的声明,那么这个行为就是未定义的。

协程执行异常的处理

在coroutine函数体执行以一个没有被捕获的异常结束的时候,其控制流如下

  1. 捕获这个异常(当然这里的捕获是在coroutine外部),并在catch块中调用promise.unhandled_exception()
  2. 调用promise.final_suspendco_await结果

coroutine state的销毁

无论coroutine是通过co_return结束还是由于未捕获的异常结束,亦或者是通过coroutine handle销毁,都会执行下面的流程来销毁coroutine启动时创建的coroutine state对象

  1. 调用promise type的析构函数
  2. 调用拷贝的函数参数对象的析构函数
  3. 调用delete操作来删除coroutine state。
  4. 返回caller或者resumer执行

Coroutine State动态分配

通常coroutine state对象会通过非数组的operator new来动态分配。在前面也介绍过了,可以说这个对象持有这这个coroutine完成执行的全部信息。coroutine是无栈式的,也正因为此,所以使用operator new来将coroutine state保存在堆中,这样在caller或者resumer执行的协程的时候可以直接使用它们的栈来执行coroutine。

coroutine state中保存了promise对象,promise type定义也会影响new的过程。

  1. 如果promise type定义了一个class级别的operator new,那么就会使用这个替代全局的operator new
  2. 如果promise type定义的operator new的第一个参数类型为std::size_t,其余的参数和coroutine的函数参数类型一致,那么就会将协程的参数传递给这个方法。通过这种方式来实现前置分配器约定,让coroutine可以使用特定的内存分配器来申请coroutine frame。
struct Coroutine::promise_type
{
    // custom non-throwing overload of new
    void* operator new(std::size_t n, Allocator allocator, Arg1, Arg2) noexcept
    {
        if (void* mem = [std::malloc](http://en.cppreference.com/w/cpp/memory/c/malloc)(n))
            return mem;
        return nullptr; // allocation failure
    }
};
// 通过前置分配器约定,coroutine可以使用指定的Allocator来完成内存分配
Coroutine coroutineA(Allocator allocator, Arg1 ,Arg2)

Note

上面说的是通常情况下会使用operator new来申请coroutine state对象,但是还有一些特殊情况能够将operator new的操作优化掉,比如满足如下的条件

  1. coroutine state的生命周期严格限制下其调用者的生命周期内
  2. coroutine frame的大小在调用时就已知 这种场景下,coroutine state可以嵌套在其调用者的栈帧(调用者是普通函数)或者coroutine state(调用者也是一个coroutine)当中,而不需要在堆上来申请内存

coroutine state分配失败,那么coroutine就会抛出std::bad_alloc异常,除非promise type定义了成员函数Promise::get_return_object_on_allocation_failure(),这样的话内存分配失败的时候,就可以立即将这个函数的结果返回给调用者。

promise的定义

promise type的类型是编译器通过coroutine的约束std::coroutine_traits的返回值来决定的。 直接举例来说明

  1. coroutine不是class的成员变量,例如声明为task<void> foo(int x);,其promise type为std::coroutine_traits<task<void>, int>::promise_type
  2. coroutine是class的非右值引用修饰符修饰的成员函数,例如声明为task<void> Bar::foo(int x) const,其promise type为std::coroutine_traits<task<void>, const Bar&, int>::promise_type
  3. coroutine是class右值引用修饰符修饰的成员函数,例如声明为task<void> Bar::foo(int x) &&;,promise type为std::coroutine_traits<tasK<void>, Bar&&, int>::promise_type

co_await运算符

co_await这个一元运算符用于挂起coroutine并将控制权返回给caller。但是从挂起到返回控制权的这个过程中具体做了哪些内容呢,就是本小节要说明的内容。

co_await expr;这个语句只能出现在普通的函数体(包括lambda函数)的求值表达式里。

co_await expr;这个语句的执行流程如下

首先,将expr转为awaitable

  1. 如果expr是由promise type的initial suspend,final suspend,或者yield表达式产生的,那么expr本身就是awaitable的
  2. 否则,如果当前coroutine的promise type具有成员函数await_transform,那么就会通过promise.await_transform(expr)转为awaitable的
  3. 否则,要求expr本身是awaitable的

接着,按照下面的方式获取awaiter对象

  1. 有更合适的operator co_await重载,那么优先使用重载的调用结果来获取awaiter对象
    1. 成员函数重载,如awaitable.operator co_await()
    2. 非成员函数重载,如co_await(static_cast<Awaitable&&>(awaitable))
  2. 否则没有重载的话,awaiter就是awaitable本身 如果上面的表达式是一个prvalue(纯右值),编译器会从右值临时物化一个对象作为awaiter对象。如果表达式时glvalue(广义左值),那么awaiter对象就是它引用的对象。

现在awaiter对象获取到了,接着就是调用其awaiter.await_ready()函数(这是一个快捷路径,要是结束已经准备好了或者完成了,那么就可以避免挂起的开销了)。要是返回的布尔结果是false的话,那么就要进入挂起的流程了。

Info

前面介绍co_await expr;的时候就是简单地提了一下挂起从routine,返回控制权,但是经过上面的内容可以发现在进入挂起之前做得操作还是挺多的。

挂起的流程如下,调用awaiter.await_suspend(handle),其中handle是当前coroutine的句柄,通过这个句柄可以观察到这个挂起的coroutine的coroutine state对象,这个函数负责将这个句柄安排给某个执行器,以便将来resume或者销毁该coroutine。

  • 如果await_suspend函数类型是void,那么控制权会立即返回给当前coroutine的caller或者resumer
  • 否则如果await_suspend的返回类型是bool的,根据返回的结果
    • true:和void相同,也是立即将控制权返回给当前coroutine的caller或者resumer.
    • false:恢复当前协程的执行
  • 如果await_suspend返回的时其他coroutine的handle,那么就会resume其他的coroutine
  • 如果await_suspend抛出异常,则捕获该异常,恢复coroutine,并立即重新抛出该异常。

最后,执行awaiter.await_resume(),这个结果就是co_await expr;最终返回的结果。coroutine被co_await expr;语句挂起再恢复后,恢复点是在awaiter.await_resume()之前的。

因此一句co_await expr;可能被编译器处理为

Awaitable awiaitable = promise.await_transform(expr);
if (!awaitable.await_ready()) {
	awaitable.await_suspend(handle); // 返回结果处理忽略
}
awaitable.await_resume();

Note

在进入awaiter.await_suspend()之前,coroutine就已经被挂起了,并不是在执行之后才挂起的。而且coroutine的句柄是可以被多个线程共享的,并且在await_suspend()函数返回之前被resume。因此如果coroutine的句柄在没有锁的情况下被跨线程共享,那么awaiter至少使用release语义,而resumer至少使用acquire语义。 举例说明一下,比如coroutine handle被放入异步I/O操作完成的回调当中,这个异步I/O操作在一个线程上完成并执行回调的时候,就会将coroutine resume执行,可能存在resume之后协程执行了awaiter的析构函数,但是await_suspend()函数还没有执行完毕,这就要求所有并发操作await_suspend应该将*this是为已经被销毁了,并且在handle发布到其他线程之后不应该再访问

下面是一个可能存在隐患的示例

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
 
 
// 定义一个coroutine
struct task
{
    struct promise_type
    {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};
 
// 创建一个awitable和awaiter对象
auto switch_to_new_thread(std::jthread& out)
{
    struct awaitable
    {
        std::jthread* p_out;
        bool await_ready() { return false; }
        void await_suspend(std::coroutine_handle<> h)
        {
            std::jthread& out = *p_out;
            if (out.joinable())
                throw std::runtime_error("Output jthread parameter not empty");
            // 这里做了一个异步操作,会resume coroutine
            out = std::jthread([h] { h.resume(); });
            // 可能存在未定义的行为,因为上面的线程异步resume,可能在执行下面的语句的时候,awaiter的析构函数已经执行完成了,p_out成员变量已经销毁了
            // 最好在上面句柄移交之后该函数就结束了
            // std::cout << "New thread ID: " << p_out->get_id() << '\n';
            std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
        }
        void await_resume() {}
    };
    return awaitable{&out};
}
 
 
 
task resuming_on_new_thread(std::jthread& out)
{
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // 执行到这里awaiter对象已经被销毁了
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
 
int main()
{
    std::jthread out;
    resuming_on_new_thread(out);
}

其实co_await expr;经过上面的介绍就是经过awaitable和awaiter的转换,然后依次调用c++为awaiter对象约定的三个方法

  • await_ready
  • await_suspend
  • await_resume 与普通方法不一样的是,这些方法在实现声明的时候返回值的类型,以及返回值的不同,会走不同的逻辑。

co_yield操作

co_yield expr;会挂起coroutine,并返回一个值给调用者。等价于co_await promise.yield_value(expr)。通常会将参数移动会拷贝到生成器当中,因为其生命周期跨越了co_await内部的挂起点。

直接看使用示例吧,使用co_yield通常也是用来创建一个生成器,每次调用获取一个值,只是这里是异步的而已。

下面就是一个使用coroutine来生成斐波那契数列的示例

#include <coroutine>
#include <cstdint>
#include <exception>
#include <iostream>
 
// 创建一个Generator,但是本质上也是coroutine
template<typename T>
struct Generator

	// coroutine的promise type
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
 
    struct promise_type // required
    {
        T value_;
        std::exception_ptr exception_;
		// 返回Geneartor对象
        Generator get_return_object()
        {
            return Generator(handle_type::from_promise(*this));
        }
        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() { exception_ = std::current_exception(); } 
        
        template<std::convertible_to<T> From> // C++20 concept
        std::suspend_always yield_value(From&& from)
        {
	        // co_yield直接转发值
            value_ = std::forward<From>(from); // caching the result in promise
            return {};
        }
        void return_void() {}
    };
	 // coroutine handle
    handle_type h_;
 
    Generator(handle_type h) : h_(h) {}
    ~Generator() { h_.destroy(); }
    explicit operator bool()
    {
        fill(); // The only way to reliably find out whether or not we finished 
        return !h_.done();
    }
    // operator()重载,提供函数式的调用
    T operator()()
    {
        fill();
        full_ = false; // we are going to move out previously cached
                       // result to make promise empty again
        return std::move(h_.promise().value_);
    }
 
private:
    bool full_ = false;
 
    void fill()
    {
        if (!full_)
        {
            h_();
            if (h_.promise().exception_)
                std::rethrow_exception(h_.promise().exception_);
            // propagate coroutine exception in called context
 
            full_ = true;
        }
    }
};
 
Generator<std::uint64_t>
fibonacci_sequence(unsigned n)
{
    if (n == 0)
        co_return;
 
    if (n > 94)
        throw std::runtime_error("Too big Fibonacci sequence. Elements would overflow.");
 
    co_yield 0;
 
    if (n == 1)
        co_return;
 
    co_yield 1;
 
    if (n == 2)
        co_return;
 
    std::uint64_t a = 0;
    std::uint64_t b = 1;
 
    for (unsigned i = 2; i < n; ++i)
    {
        std::uint64_t s = a + b;
        co_yield s;
        a = b;
        b = s;
    }
}
 
int main()
{
    try
    {
        auto gen = fibonacci_sequence(10); // max 94 before uint64_t overflows
 
        for (int j = 0; gen; ++j)
            std::cout << "fib(" << j << ")=" << gen() << '\n';
    }
    catch (const std::exception& ex)
    {
        std::cerr << "Exception: " << ex.what() << '\n';
    }
    catch (...)
    {
        std::cerr << "Unknown exception.\n";
    }

示例

一些流行的开源项目也都在探索c++20 coroutine的使用。可以学习参考一下


coroutine