概述
- 协程是一种可以挂起和恢复的函数
- c++中的协程是无栈式的,也就是使用的是调用者的线程栈,恢复执行所需要的数据也存在堆上而不是栈上。
借助协程可以使得顺序执行的代码异步执行,比如在不需要显式回调的情况下处理非阻塞I/O。
在函数的定义当中包含下面语句的就可以认为该函数是一个协程
co_await expression:suspend当前协程直到被resumed
task<> tcp_echo_server()
{
char data[1024];
while (true)
{
// 让出当前协程执行async_read_some,等待执行结束后resumed
std::size_t n = co_await socket.async_read_some(buffer(data));
// 让出当前协程执行async_write,等待被resumed
co_await async_write(socket, buffer(data, n));
}
}co_yield expression:suspend当前协程并返回一个值
generator<unsigned int> iota(unsigned int n = 0)
{
while (true)
co_yield n++;
}co_return expression:结束当前协程的执行并返回一个值
lazy<int> f()
{
co_return 7;
}约束
- 协程不能使用可变参数,不能使用普通的return语句,不能使用占位的返回类型(如auto, Concept)。
- consteval函数、constexpr函数,构造函数、析构函数、main函数不能是协程
协程执行关键结构
promise type:在协程内部操作的对象。协程通过这个对象来提交返回值或者异常。需要注意的是promise对象是协程的一个概念,与标准库中的std::promise没有任何关系。coroutine handle:在协程外部进行操作的对象(用于外部操作协程的)。用于resume协程执行或者销毁协程的frame。coroutine state:这是一个内部的、动态分配的存储(除非被优化掉)对象,这个对象包含一下内容- promise object
- 参数(全部都是值拷贝)
- 当前挂起点的表示(比如寄存器等信息),resumed时知道从哪里继续,销毁的时候知道哪些局部变量处于作用域中
- 生命周期跨越当前挂起点的局部变量和临时变量
其实,c++20的coroutinue就是在语言层面定义了一些类型和关键字,只要按照给定的规范编写协程,编译器就可以将其转换为对应的代码,从而完成协程的异步执行。
- 由于coroutine是无栈协程,因此需要使用
coroutine state来保存执行的上下文信息,以便挂起恢复执行能够继续执行。 - 在最开始就已经介绍过了,coroutine其实还是一个函数,因此仍然有返回值,就是通过
promise type来设置的 - coroutine挂起之后能够被恢复,就是通过
coroutine handle来完成的,挂起的时候,将自己的句柄也一并提交给需要创建的awaitable操作,以便这个awaitable对象执行完成之后能够恢复这个协程的执行。 结合下面的执行流程或者就更加明晰了。
协程执行流程
协程的启动流程
一个coroutine的启动执行流程
- 使用new关键字申请
coroutine state对象 - 将函数的所有的参数拷贝到
coroutine state对象当中:按值传递的参数会被move或者copy,按引用传递的参数仍然保持引用传递(因此如果在所引用的对象生命周期结束之后恢复协程,那么它可能会成为一个悬垂引用) - 调用promise对象的构造函数。如果该promise类型有一个接受所有coroutine参数的构造函数,那么调用该构造函数,并使用复制后的协程参数传参。否则就调用默认的构造函数。
- 调用
promise.get_return_object()并将结果保存在局部变量当中,当协程首次挂起时,该调用结果将返回给调用者。在此步骤及其之前抛出的任何异常都将传播会调用者,而不是放置在promise当中。 - 调用
promise.initial_suspend()并co_await结果。通常promise type要么返回std::suspend_always用于懒加载的协程,要么返回std::suspend_never用于立即启动的协程。 - 当
co_await promise.initial_suspend()恢复,开始执行协程函数体
协程的挂起点
在coroutine函数体到达挂起点之后
- 之前
promise.get_return_object()返回的对象会被隐式转换为协程的返回类型,返回给协程的调用者或者恢复者,继续调用者原本的控制流。(从控制流的角度来看和goto关键字类似,其实无非就是跳转指令)
协程的返回
coroutine执行到co_return语句之后
2. 返回类型为void,例如co_return;语句或者co_return expr;中的expr类型为void,则调用promise.return_void()
3. 非void返回类型,例如co_return expr;其中expr类型不为void。则调用promise.return_value(expr)
4. 按照创建的相反的顺序销毁自动创建的所有变量
5. 调用promise.final_suspend()并co_await结果
coroutine执行结束离开等效于co_return;语句,但是如果在promise type中没有找到return_void的声明,那么这个行为就是未定义的。
协程执行异常的处理
在coroutine函数体执行以一个没有被捕获的异常结束的时候,其控制流如下
- 捕获这个异常(当然这里的捕获是在coroutine外部),并在catch块中调用
promise.unhandled_exception() - 调用
promise.final_suspend并co_await结果
coroutine state的销毁
无论coroutine是通过co_return结束还是由于未捕获的异常结束,亦或者是通过coroutine handle销毁,都会执行下面的流程来销毁coroutine启动时创建的coroutine state对象
- 调用
promise type的析构函数 - 调用拷贝的函数参数对象的析构函数
- 调用
delete操作来删除coroutine state。 - 返回caller或者resumer执行
Coroutine State动态分配
通常coroutine state对象会通过非数组的operator new来动态分配。在前面也介绍过了,可以说这个对象持有这这个coroutine完成执行的全部信息。coroutine是无栈式的,也正因为此,所以使用operator new来将coroutine state保存在堆中,这样在caller或者resumer执行的协程的时候可以直接使用它们的栈来执行coroutine。
coroutine state中保存了promise对象,promise type定义也会影响new的过程。
- 如果promise type定义了一个class级别的
operator new,那么就会使用这个替代全局的operator new - 如果
promise type定义的operator new的第一个参数类型为std::size_t,其余的参数和coroutine的函数参数类型一致,那么就会将协程的参数传递给这个方法。通过这种方式来实现前置分配器约定,让coroutine可以使用特定的内存分配器来申请coroutine frame。
struct Coroutine::promise_type
{
// custom non-throwing overload of new
void* operator new(std::size_t n, Allocator allocator, Arg1, Arg2) noexcept
{
if (void* mem = [std::malloc](http://en.cppreference.com/w/cpp/memory/c/malloc)(n))
return mem;
return nullptr; // allocation failure
}
};
// 通过前置分配器约定,coroutine可以使用指定的Allocator来完成内存分配
Coroutine coroutineA(Allocator allocator, Arg1 ,Arg2)Note
上面说的是通常情况下会使用
operator new来申请coroutine state对象,但是还有一些特殊情况能够将operator new的操作优化掉,比如满足如下的条件
coroutine state的生命周期严格限制下其调用者的生命周期内coroutine frame的大小在调用时就已知 这种场景下,coroutine state可以嵌套在其调用者的栈帧(调用者是普通函数)或者coroutine state(调用者也是一个coroutine)当中,而不需要在堆上来申请内存
coroutine state分配失败,那么coroutine就会抛出std::bad_alloc异常,除非promise type定义了成员函数Promise::get_return_object_on_allocation_failure(),这样的话内存分配失败的时候,就可以立即将这个函数的结果返回给调用者。
promise的定义
promise type的类型是编译器通过coroutine的约束std::coroutine_traits的返回值来决定的。
直接举例来说明
- coroutine不是class的成员变量,例如声明为
task<void> foo(int x);,其promise type为std::coroutine_traits<task<void>, int>::promise_type - coroutine是class的非右值引用修饰符修饰的成员函数,例如声明为
task<void> Bar::foo(int x) const,其promise type为std::coroutine_traits<task<void>, const Bar&, int>::promise_type - coroutine是class右值引用修饰符修饰的成员函数,例如声明为
task<void> Bar::foo(int x) &&;,promise type为std::coroutine_traits<tasK<void>, Bar&&, int>::promise_type。
co_await运算符
co_await这个一元运算符用于挂起coroutine并将控制权返回给caller。但是从挂起到返回控制权的这个过程中具体做了哪些内容呢,就是本小节要说明的内容。
co_await expr;这个语句只能出现在普通的函数体(包括lambda函数)的求值表达式里。
co_await expr;这个语句的执行流程如下
首先,将expr转为awaitable
- 如果expr是由promise type的initial suspend,final suspend,或者yield表达式产生的,那么expr本身就是awaitable的
- 否则,如果当前coroutine的promise type具有成员函数
await_transform,那么就会通过promise.await_transform(expr)转为awaitable的 - 否则,要求expr本身是awaitable的
接着,按照下面的方式获取awaiter对象
- 有更合适的
operator co_await重载,那么优先使用重载的调用结果来获取awaiter对象- 成员函数重载,如
awaitable.operator co_await() - 非成员函数重载,如
co_await(static_cast<Awaitable&&>(awaitable))
- 成员函数重载,如
- 否则没有重载的话,awaiter就是awaitable本身 如果上面的表达式是一个prvalue(纯右值),编译器会从右值临时物化一个对象作为awaiter对象。如果表达式时glvalue(广义左值),那么awaiter对象就是它引用的对象。
现在awaiter对象获取到了,接着就是调用其awaiter.await_ready()函数(这是一个快捷路径,要是结束已经准备好了或者完成了,那么就可以避免挂起的开销了)。要是返回的布尔结果是false的话,那么就要进入挂起的流程了。
Info
前面介绍
co_await expr;的时候就是简单地提了一下挂起从routine,返回控制权,但是经过上面的内容可以发现在进入挂起之前做得操作还是挺多的。
挂起的流程如下,调用awaiter.await_suspend(handle),其中handle是当前coroutine的句柄,通过这个句柄可以观察到这个挂起的coroutine的coroutine state对象,这个函数负责将这个句柄安排给某个执行器,以便将来resume或者销毁该coroutine。
- 如果
await_suspend函数类型是void,那么控制权会立即返回给当前coroutine的caller或者resumer - 否则如果
await_suspend的返回类型是bool的,根据返回的结果- true:和void相同,也是立即将控制权返回给当前coroutine的caller或者resumer.
- false:恢复当前协程的执行
- 如果
await_suspend返回的时其他coroutine的handle,那么就会resume其他的coroutine - 如果
await_suspend抛出异常,则捕获该异常,恢复coroutine,并立即重新抛出该异常。
最后,执行awaiter.await_resume(),这个结果就是co_await expr;最终返回的结果。coroutine被co_await expr;语句挂起再恢复后,恢复点是在awaiter.await_resume()之前的。
因此一句co_await expr;可能被编译器处理为
Awaitable awiaitable = promise.await_transform(expr);
if (!awaitable.await_ready()) {
awaitable.await_suspend(handle); // 返回结果处理忽略
}
awaitable.await_resume();Note
在进入
awaiter.await_suspend()之前,coroutine就已经被挂起了,并不是在执行之后才挂起的。而且coroutine的句柄是可以被多个线程共享的,并且在await_suspend()函数返回之前被resume。因此如果coroutine的句柄在没有锁的情况下被跨线程共享,那么awaiter至少使用release语义,而resumer至少使用acquire语义。 举例说明一下,比如coroutine handle被放入异步I/O操作完成的回调当中,这个异步I/O操作在一个线程上完成并执行回调的时候,就会将coroutine resume执行,可能存在resume之后协程执行了awaiter的析构函数,但是await_suspend()函数还没有执行完毕,这就要求所有并发操作await_suspend应该将*this是为已经被销毁了,并且在handle发布到其他线程之后不应该再访问
下面是一个可能存在隐患的示例
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
// 定义一个coroutine
struct task
{
struct promise_type
{
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
// 创建一个awitable和awaiter对象
auto switch_to_new_thread(std::jthread& out)
{
struct awaitable
{
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h)
{
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
// 这里做了一个异步操作,会resume coroutine
out = std::jthread([h] { h.resume(); });
// 可能存在未定义的行为,因为上面的线程异步resume,可能在执行下面的语句的时候,awaiter的析构函数已经执行完成了,p_out成员变量已经销毁了
// 最好在上面句柄移交之后该函数就结束了
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
}
void await_resume() {}
};
return awaitable{&out};
}
task resuming_on_new_thread(std::jthread& out)
{
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_new_thread(out);
// 执行到这里awaiter对象已经被销毁了
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
int main()
{
std::jthread out;
resuming_on_new_thread(out);
}其实co_await expr;经过上面的介绍就是经过awaitable和awaiter的转换,然后依次调用c++为awaiter对象约定的三个方法
await_readyawait_suspendawait_resume与普通方法不一样的是,这些方法在实现声明的时候返回值的类型,以及返回值的不同,会走不同的逻辑。
co_yield操作
co_yield expr;会挂起coroutine,并返回一个值给调用者。等价于co_await promise.yield_value(expr)。通常会将参数移动会拷贝到生成器当中,因为其生命周期跨越了co_await内部的挂起点。
直接看使用示例吧,使用co_yield通常也是用来创建一个生成器,每次调用获取一个值,只是这里是异步的而已。
下面就是一个使用coroutine来生成斐波那契数列的示例
#include <coroutine>
#include <cstdint>
#include <exception>
#include <iostream>
// 创建一个Generator,但是本质上也是coroutine
template<typename T>
struct Generator
{
// coroutine的promise type
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type // required
{
T value_;
std::exception_ptr exception_;
// 返回Geneartor对象
Generator get_return_object()
{
return Generator(handle_type::from_promise(*this));
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() { exception_ = std::current_exception(); }
template<std::convertible_to<T> From> // C++20 concept
std::suspend_always yield_value(From&& from)
{
// co_yield直接转发值
value_ = std::forward<From>(from); // caching the result in promise
return {};
}
void return_void() {}
};
// coroutine handle
handle_type h_;
Generator(handle_type h) : h_(h) {}
~Generator() { h_.destroy(); }
explicit operator bool()
{
fill(); // The only way to reliably find out whether or not we finished
return !h_.done();
}
// operator()重载,提供函数式的调用
T operator()()
{
fill();
full_ = false; // we are going to move out previously cached
// result to make promise empty again
return std::move(h_.promise().value_);
}
private:
bool full_ = false;
void fill()
{
if (!full_)
{
h_();
if (h_.promise().exception_)
std::rethrow_exception(h_.promise().exception_);
// propagate coroutine exception in called context
full_ = true;
}
}
};
Generator<std::uint64_t>
fibonacci_sequence(unsigned n)
{
if (n == 0)
co_return;
if (n > 94)
throw std::runtime_error("Too big Fibonacci sequence. Elements would overflow.");
co_yield 0;
if (n == 1)
co_return;
co_yield 1;
if (n == 2)
co_return;
std::uint64_t a = 0;
std::uint64_t b = 1;
for (unsigned i = 2; i < n; ++i)
{
std::uint64_t s = a + b;
co_yield s;
a = b;
b = s;
}
}
int main()
{
try
{
auto gen = fibonacci_sequence(10); // max 94 before uint64_t overflows
for (int j = 0; gen; ++j)
std::cout << "fib(" << j << ")=" << gen() << '\n';
}
catch (const std::exception& ex)
{
std::cerr << "Exception: " << ex.what() << '\n';
}
catch (...)
{
std::cerr << "Unknown exception.\n";
}示例
一些流行的开源项目也都在探索c++20 coroutine的使用。可以学习参考一下