future 表示一个能拿到某个值的预期
libc++ future
逻辑内容:
实现部分在 include/c++/v1/future
和
future 类型如下:
1 2 3 4 5 6 7 8 9 10 11 12
| template <class _Rp> class _LIBCPP_TEMPLATE_VIS _LIBCPP_AVAILABILITY_FUTURE future { __assoc_state<_Rp>* __state_;
explicit future(__assoc_state<_Rp>* __state);
template <class> friend class promise; template <class> friend class shared_future; public: };
|
可以看到,future 包装了一层 __assoc_state<_rp>
, 我们进 __assoc_state
看看:
1 2 3 4 5
| template <class _Rp> class _LIBCPP_AVAILABILITY_FUTURE _LIBCPP_HIDDEN __assoc_state : public __assoc_sub_state { };
|
哦,他是 __assoc_sub_state<_Rp>
的子类:
1 2 3 4 5
| class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE __assoc_sub_state : public __shared_count {
};
|
这里面包了一层 __shared_count
,还记得之前的图吗?这个内部就是一个引用计数,实际上 std::shared_ptr
也搬用了这套实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class _LIBCPP_TYPE_VIS __shared_count { __shared_count(const __shared_count&); __shared_count& operator=(const __shared_count&);
protected: long __shared_owners_; virtual ~__shared_count(); private: virtual void __on_zero_shared() _NOEXCEPT = 0;
public: _LIBCPP_INLINE_VISIBILITY void __add_shared() _NOEXCEPT; _LIBCPP_INLINE_VISIBILITY bool __release_shared() _NOEXCEPT; #endif _LIBCPP_INLINE_VISIBILITY long use_count() const _NOEXCEPT; };
|
这里需要解释一下 __on_zero_shared
,这里可以看作是用户调用的 rc == 0 的时候清除资源的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE __assoc_sub_state : public __shared_count { protected: exception_ptr __exception_; mutable mutex __mut_; mutable condition_variable __cv_; unsigned __state_;
virtual void __on_zero_shared() _NOEXCEPT; void __sub_wait(unique_lock<mutex>& __lk); public: enum { __constructed = 1, __future_attached = 2, ready = 4, deferred = 8 }; };
|
这里内容大概是:
__state
标注了对应的状态
__cv_
和 __mut_
表示了对应的并发状态
__exception_
表示存在这里的异常
这里也有设置状态对应的成员,包括 __set_value
等,基本上是处理并发和设置状态
回头看外面:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| template <class _Rp> class _LIBCPP_AVAILABILITY_FUTURE _LIBCPP_HIDDEN __assoc_state : public __assoc_sub_state { typedef __assoc_sub_state base; typedef typename aligned_storage<sizeof(_Rp), alignment_of<_Rp>::value>::type _Up; protected: _Up __value_;
virtual void __on_zero_shared() _NOEXCEPT; public:
template <class _Arg> void set_value(_Arg&& __arg);
template <class _Arg> void set_value_at_thread_exit(_Arg&& __arg);
_Rp move(); typename add_lvalue_reference<_Rp>::type copy(); };
|
这里可以发现 _Up
存储的是对应的值,而 __assoc_sub_state
存储的是状态。之所以是 aligned_storage
,是因为这是一块可能没初始化的内存。我们继续看看 wait
之类的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| template <class _Rp> template <class _Arg> _LIBCPP_AVAILABILITY_FUTURE void __assoc_state<_Rp>::set_value(_Arg&& __arg) { unique_lock<mutex> __lk(this->__mut_); if (this->__has_value()) __throw_future_error(future_errc::promise_already_satisfied); ::new ((void*)&__value_) _Rp(_VSTD::forward<_Arg>(__arg)); this->__state_ |= base::__constructed | base::ready; __cv_.notify_all(); }
template <class _Rp> _Rp __assoc_state<_Rp>::move() { unique_lock<mutex> __lk(this->__mut_); this->__sub_wait(__lk); if (this->__exception_ != nullptr) rethrow_exception(this->__exception_); return _VSTD::move(*reinterpret_cast<_Rp*>(&__value_)); }
template <class _Rp> typename add_lvalue_reference<_Rp>::type __assoc_state<_Rp>::copy() { unique_lock<mutex> __lk(this->__mut_); this->__sub_wait(__lk); if (this->__exception_ != nullptr) rethrow_exception(this->__exception_); return *reinterpret_cast<_Rp*>(&__value_); }
|
首先,__assoc_sub_state
会调用 __sub_wait
,这里会等待到 __assoc_sub_state
状态合理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| void __assoc_sub_state::__sub_wait(unique_lock<mutex>& __lk) { if (!__is_ready()) { if (__state_ & static_cast<unsigned>(deferred)) { __state_ &= ~static_cast<unsigned>(deferred); __lk.unlock(); __execute(); } else while (!__is_ready()) __cv_.wait(__lk); } }
|
这里相当于:
- 设置值:拿到
_sub_state
的锁,然后 placement new,再在 _sub_state
上通知
- 读取值(wait) :等待
_sub_state
状态设置完成,然后拿 __value_
最后,我们看看外层的 future
:
1 2 3 4 5 6 7 8 9
| template <class _Rp> _Rp future<_Rp>::get() { unique_ptr<__shared_count, __release_shared_count> __(__state_); __assoc_state<_Rp>* __s = __state_; __state_ = nullptr; return __s->move(); }
|
promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
template <> class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE promise<void> { __assoc_sub_state* __state_;
_LIBCPP_INLINE_VISIBILITY explicit promise(nullptr_t) _NOEXCEPT : __state_(nullptr) {}
template <class> friend class packaged_task;
public: };
|
这里,get_future
会直接返回一个 future:
1 2 3 4 5 6 7 8
| template <class _Rp> future<_Rp> promise<_Rp>::get_future() { if (__state_ == nullptr) __throw_future_error(future_errc::no_state); return future<_Rp>(__state_); }
|
我们回到 future, 看看它的构造函数:
1 2 3 4 5 6
| template <class _Rp> future<_Rp>::future(__assoc_state<_Rp>* __state) : __state_(__state) { __state_->__attach_future(); }
|
这里依靠 __assoc_sub_state<_Rp>::__attach_future
来避免重复使用
packaged_task
std::packaged_task
类似 std::function
,是一个类型擦除的函数,描述任务,这里还允许 get_future
来访问。
shared_future
shared_future
可以包装一个 future<T>
,然后多次使用,它的内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| template <class _Rp> class _LIBCPP_TEMPLATE_VIS shared_future { __assoc_state<_Rp>* __state_;
public:
};
template <class _Rp> shared_future<_Rp>::~shared_future() { if (__state_) __state_->__release_shared(); }
|
shared_future
有一堆偏特化的版本,因为它的 get
会非常恶心,future
返回一次咋搞都行,shared_future
可能要处理很多语义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| template <class _Rp> class _LIBCPP_TEMPLATE_VIS shared_future<_Rp&> { __assoc_state<_Rp&>* __state_;
public: _LIBCPP_INLINE_VISIBILITY _Rp& get() const {return __state_->copy();}
};
template <> class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE shared_future<void> { __assoc_sub_state* __state_;
public:
};
|
std::async
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| template <class _Fp, class... _Args> _LIBCPP_NODISCARD_AFTER_CXX17 future<typename __invoke_of<typename decay<_Fp>::type, typename decay<_Args>::type...>::type> async(launch __policy, _Fp&& __f, _Args&&... __args) { typedef __async_func<typename decay<_Fp>::type, typename decay<_Args>::type...> _BF; typedef typename _BF::_Rp _Rp;
#ifndef _LIBCPP_NO_EXCEPTIONS try { #endif if (__does_policy_contain(__policy, launch::async)) return _VSTD::__make_async_assoc_state<_Rp>(_BF(_VSTD::__decay_copy(_VSTD::forward<_Fp>(__f)), _VSTD::__decay_copy(_VSTD::forward<_Args>(__args))...)); #ifndef _LIBCPP_NO_EXCEPTIONS } catch ( ... ) { if (__policy == launch::async) throw ; } #endif
if (__does_policy_contain(__policy, launch::deferred)) return _VSTD::__make_deferred_assoc_state<_Rp>(_BF(_VSTD::__decay_copy(_VSTD::forward<_Fp>(__f)), _VSTD::__decay_copy(_VSTD::forward<_Args>(__args))...)); return future<_Rp>{}; }
|
里面实现大概是:
- 创建
__async_assoc_state
或者 __defered_assoc_state
- lazy 或者动态的创建线程
- 创建 future
folly Future & Executor
folly Future 相对于 future,原理并没有太大差异,但是外层和实现丰富了很多:
- 使用
folly::Function
,函数对象支持了 SBO 等
then
等组合子,方便处理代码
via
等,支持绑定 executor,并抽出了 Future
和 SemiFuture
等类型
folly 对 Future 的支持相当复杂,继承链大概包括:
1 2 3 4 5
| CoreBase - Core<T>
FutureBase<T> (包含 Core<T>) - Future<T> / SemiFuture<T>
|
这里的 Future 允许链式、poll
来查看是否成功等。SemiFuture
则是状态的描述。Future
有对应的 Executor,靠 KeepAlive
来指向 Executor,SemiFuture
则没有,除了 inline 执行(即在本线程就地执行),不会允许对应的逻辑出现。
CoreBase
描述了 Future
基本的对应状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
|
因为要描述下一个任务这样的 chain,所以这套逻辑还是比较恶心的。此外,folly::Future
逻辑涉及了对应的 Executor
,每个 future 会有 dispatch 给谁的逻辑,这个和链式一样,引入了复杂度。
Why are futures slow
future 在 C++ 中出现在 C++11 的标准库,而别的语言也常有类似的东西。它是由库来实现的。类似 std::function
,std::future
是堆上的、类型擦除的。这两部分抽象都带来了一定的开销。
之后会分别介绍 Seastar future、Rust future、C++ executors,来看看它们是怎么抽象的。简单来说:
- C++ executors 会有 sender / receiver,用模版类型来封装他们,然后提供了各种算法来做抽象
- Rust 靠
Future<T>
来抽象,Box<Future<T>>
来表示动态一些的对象。Future<T>
需要可以 poll()
,上层的状态机来 polling,做查询
- seastar 靠模型的抽象,来简化了相关的逻辑。
References
libcxx future
folly future
浅谈The C++ Executors https://zhuanlan.zhihu.com/p/395250667 (绝世好文)