Task

Task就是folly实现的一种coroutine,精简下来的定义如下

template <typename T>  
class FOLLY_CORO_TASK_ATTRS Task {  
 public:  
  using promise_type = detail::TaskPromise<T>;  
 
private:
  using handle_t = coroutine_handle<promise_type>;
  Task(handle_t coro) noexcept : coro_(coro) {}
  handle_t coro_;
};

Task只有一个字段,那么就是所持有的coroutine句柄。至于promise type就是detail::TaskPromise<T>了。也是corouine的核心类型

TaskPromise

template <typename T>  
class TaskPromise final : public TaskPromiseCrtpBase<TaskPromise<T>, T> {
using StorageType =  
    typename TaskPromiseCrtpBase<TaskPromise<T>, T>::StorageType;
 
// 用来设置返回值
template <typename U = T>  
void return_value(U&& value) {  
  if constexpr (std::is_same_v<remove_cvref_t<U>, Try<StorageType>>) {  
	// 场景1:当返回值类型是 Try<StorageType> 时
    DCHECK(value.hasValue() || (value.hasException() && value.exception()));  
    this->result_ = static_cast<U&&>(value);  
  } else if constexpr (  
      std::is_same_v<remove_cvref_t<U>, Try<void>> &&  
      std::is_same_v<remove_cvref_t<T>, Unit>) {  
    // 场景2:T为Unit,U为Try<void>时的特化。用于处理StorageType为Unit,但是函数调用返回值为Try<void>时的桥接处理
    DCHECK(value.hasValue() || (value.hasException() && value.exception()));  
    this->result_ = static_cast<Try<Unit>>(static_cast<U&&>(value));  
  } else {  
    // 场景3:通用类型直接转换
    static_assert(  
        std::is_convertible<U&&, StorageType>::value,  
        "cannot convert return value to type T");  
    this->result_.emplace(static_cast<U&&>(value));  
  }  
}
 
};

TaskPromise里面只有return_value方法用于co_return expr;。这里使用StorageType来存储返回值,使用了模板特化以及条件编译来处理不同的返回值类型

  1. co_return expr;的expr值本身就是StorageType,那么直接设置给result_字段即可
  2. 处理Try<void>Unit的转换
  3. expr表达式需要是可以转换为StorageType的类型

TaskPromise是一个子类,其他相关的函数都在其继承的基类当中。基类又拆分了TaskPromiseCrtpBaseTaskPromiseBase前者主要用于将公共代码和需要特化的泛型代码从后者分离开,用于减少编译器的特化量。

// Separate from `TaskPromiseBase` so the compiler has less to specialize.  
template <typename Promise, typename T>  
class TaskPromiseCrtpBase  
    : public TaskPromiseBase,  
      public ExtendedCoroutinePromise {  
 public:  
  // 对std::reference_wrapper<T>的封装,可以持有引用对象
  using StorageType = detail::lift_lvalue_reference_t<T>;  
  // 由编译器在coroutine入口处调用,用来构造Task对象
  Task<T> get_return_object() noexcept;  
  // coroutine执行过程中抛出的未捕获的异常,由编译器捕获并调用此函数
  void unhandled_exception() noexcept {  
    result_.emplaceException(exception_wrapper{current_exception()});  
  }  
  
  Try<StorageType>& result() { return result_; }  
 
  // co_yield expr;产生异常
  auto yield_value(co_error ex) {  
    result_.emplaceException(std::move(ex.exception()));  
    return final_suspend();  
  }  
  // co_yield expr;的一种重载
  auto yield_value(co_result<StorageType>&& result) {  
    result_ = std::move(result.result());  
    return final_suspend();  
  }  
  
  using TaskPromiseBase::await_transform;  
 
  // 允许在协程内部执行co_await co_safe_point; // co_safe_point是co_safe_point_t的一个全局对象。
  auto await_transform(co_safe_point_t) noexcept {  
    return do_safe_point(*this);  
  }  
  
 protected:  
  // 用于持有协程返回值
  Try<StorageType> result_;  
};

TaskPromise中设置了coroutine结束co_return expr;如何处理。这里在TaskPromiseCrtpBase则是设置了co_yield expr;的返回值处理、异常处理。从这个类名中就可以看出来,这个类多了一些控制能力,值得注意的是await_transform(co_safe_point_t)方法,使得在Task这个coroutine里支持一种co_await co_safe_point;的写法,用于表示coroutine执行到了一个安全点,可以检测一下这个是否被取消,然后结束coroutine的执行

看一下do_safe_point的实现

template <typename Promise>  
variant_awaitable<FinalAwaiter, ready_awaitable<>> do_safe_point(  
    Promise& promise) noexcept {  
  // 根据cancelToken_字段的值用于判断是否取消,取消了就直接调用yield_value方法结束coroutine
  if (cancelToken_.isCancellationRequested()) {  
    return promise.yield_value(co_cancelled);  
  }  
  return ready_awaitable<>{};  
}
 
 
class co_cancelled_t final {  
 public:  
  // 可以转为co_error对象
  /* implicit */ operator co_error() const {  
    return co_error(OperationCancelled{});  
  }  
};  
  
inline constexpr co_cancelled_t co_cancelled{};

上面两个类将Task这个coroutine各种返回值的处理,以及额外的取消控制逻辑处理好了。下面就是最基础的co_await expr;的处理了。

 
// folly定义的promise type的基类
class TaskPromiseBase {  
  
 public:  
  // 使用自定义的内存分配器,保证coroutine的内存也在管控当中
  static void* operator new(std::size_t size) {  
    return ::folly_coro_async_malloc(size);  
  }  
  // 使用自定义的内存释放器
  static void operator delete(void* ptr, std::size_t size) {  
    ::folly_coro_async_free(ptr, size);  
  }  
  // 编译器在coroutine开始时调用
  suspend_always initial_suspend() noexcept { return {}; }  
  // 编译器在coroutine结束时调用
  FinalAwaiter final_suspend() noexcept { return {}; }  
 
  // 各种转为awaitable的重载
  template <  
      typename Awaitable,  
      std::enable_if_t<!must_await_immediately_v<Awaitable>, int> = 0>  
  auto await_transform(Awaitable&& awaitable) {  
    bypassExceptionThrowing_ =  
        bypassExceptionThrowing_ == BypassExceptionThrowing::REQUESTED  
        ? BypassExceptionThrowing::ACTIVE  
        : BypassExceptionThrowing::INACTIVE;  
  
    return folly::coro::co_withAsyncStack(folly::coro::co_viaIfAsync(  
        executor_.get_alias(),  
        folly::coro::co_withCancellation(  
            cancelToken_, static_cast<Awaitable&&>(awaitable))));  
  }  
  template <  
      typename Awaitable,  
      std::enable_if_t<must_await_immediately_v<Awaitable>, int> = 0>  
  auto await_transform(Awaitable awaitable) {  
    bypassExceptionThrowing_ =  
        bypassExceptionThrowing_ == BypassExceptionThrowing::REQUESTED  
        ? BypassExceptionThrowing::ACTIVE  
        : BypassExceptionThrowing::INACTIVE;  
  
    return folly::coro::co_withAsyncStack(folly::coro::co_viaIfAsync(  
        executor_.get_alias(),  
        folly::coro::co_withCancellation(  
            cancelToken_,  
            mustAwaitImmediatelyUnsafeMover(std::move(awaitable))())));  
  }  
  
  template <  
      typename Awaitable,  
      std::enable_if_t<!must_await_immediately_v<Awaitable>, int> = 0>  
  auto await_transform(NothrowAwaitable<Awaitable>&& awaitable) {  
    bypassExceptionThrowing_ = BypassExceptionThrowing::REQUESTED;  
    return await_transform(awaitable.unwrap());  
  }  
  template <  
      typename Awaitable,  
      std::enable_if_t<must_await_immediately_v<Awaitable>, int> = 0>  
  auto await_transform(NothrowAwaitable<Awaitable> awaitable) {  
    bypassExceptionThrowing_ = BypassExceptionThrowing::REQUESTED;  
    return await_transform(  
        mustAwaitImmediatelyUnsafeMover(awaitable.unwrap())());  
  }  
 
  // 可以在Task coroutine内部执行co_await co_current_executor;来获取一个执行器
  auto await_transform(co_current_executor_t) noexcept {  
    return ready_awaitable<folly::Executor*>{executor_.get()};  
  }  
  // 执行co_await co_current_cancellation_token;可以获取当前coroutine的cancelToken_
  auto await_transform(co_current_cancellation_token_t) noexcept {  
    return ready_awaitable<const folly::CancellationToken&>{cancelToken_};  
  }  
};

到这里可以发现coroutine一个非常有意思的点,那就是借助await_transform的各种类型重载,可以支持我们在coroutine的函数体内部扩展各种co_await expr;的写法,来使得coroutine的功能更加丰富。


coroutine