future and promise in libc++/folly

future 表示一个能拿到某个值的预期

libc++ future

逻辑内容:

libcpp-future

实现部分在 include/c++/v1/future

future 类型如下:

1
2
3
4
5
6
7
8
9
10
11
12
template <class _Rp>
class _LIBCPP_TEMPLATE_VIS _LIBCPP_AVAILABILITY_FUTURE future
{
__assoc_state<_Rp>* __state_;

explicit future(__assoc_state<_Rp>* __state);

template <class> friend class promise;
template <class> friend class shared_future;
public:
// ...
};

可以看到,future 包装了一层 __assoc_state<_rp>, 我们进 __assoc_state 看看:

1
2
3
4
5
template <class _Rp>
class _LIBCPP_AVAILABILITY_FUTURE _LIBCPP_HIDDEN __assoc_state
: public __assoc_sub_state
{//...
};

哦,他是 __assoc_sub_state<_Rp> 的子类:

1
2
3
4
5
class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE __assoc_sub_state
: public __shared_count
{
// ...
};

这里面包了一层 __shared_count,还记得之前的图吗?这个内部就是一个引用计数,实际上 std::shared_ptr 也搬用了这套实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class _LIBCPP_TYPE_VIS __shared_count
{
__shared_count(const __shared_count&);
__shared_count& operator=(const __shared_count&);

protected:
long __shared_owners_;
virtual ~__shared_count();
private:
virtual void __on_zero_shared() _NOEXCEPT = 0;

public:
_LIBCPP_INLINE_VISIBILITY
void __add_shared() _NOEXCEPT;
_LIBCPP_INLINE_VISIBILITY
bool __release_shared() _NOEXCEPT;
#endif
_LIBCPP_INLINE_VISIBILITY
long use_count() const _NOEXCEPT;
};

这里需要解释一下 __on_zero_shared,这里可以看作是用户调用的 rc == 0 的时候清除资源的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE __assoc_sub_state
: public __shared_count
{
protected:
exception_ptr __exception_;
mutable mutex __mut_;
mutable condition_variable __cv_;
unsigned __state_;

virtual void __on_zero_shared() _NOEXCEPT;
void __sub_wait(unique_lock<mutex>& __lk);
public:
enum
{
__constructed = 1,
__future_attached = 2,
ready = 4,
deferred = 8
};
};

这里内容大概是:

  1. __state 标注了对应的状态
  2. __cv___mut_ 表示了对应的并发状态
  3. __exception_ 表示存在这里的异常

这里也有设置状态对应的成员,包括 __set_value 等,基本上是处理并发和设置状态

回头看外面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class _Rp>
class _LIBCPP_AVAILABILITY_FUTURE _LIBCPP_HIDDEN __assoc_state
: public __assoc_sub_state
{
typedef __assoc_sub_state base;
typedef typename aligned_storage<sizeof(_Rp), alignment_of<_Rp>::value>::type _Up;
protected:
_Up __value_;

virtual void __on_zero_shared() _NOEXCEPT;
public:

template <class _Arg>
void set_value(_Arg&& __arg);

template <class _Arg>
void set_value_at_thread_exit(_Arg&& __arg);

_Rp move();
typename add_lvalue_reference<_Rp>::type copy();
};

这里可以发现 _Up 存储的是对应的值,而 __assoc_sub_state 存储的是状态。之所以是 aligned_storage,是因为这是一块可能没初始化的内存。我们继续看看 wait 之类的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template <class _Rp>
template <class _Arg>
_LIBCPP_AVAILABILITY_FUTURE
void
__assoc_state<_Rp>::set_value(_Arg&& __arg)
{
unique_lock<mutex> __lk(this->__mut_);
if (this->__has_value())
__throw_future_error(future_errc::promise_already_satisfied);
::new ((void*)&__value_) _Rp(_VSTD::forward<_Arg>(__arg));
this->__state_ |= base::__constructed | base::ready;
__cv_.notify_all();
}

template <class _Rp>
_Rp
__assoc_state<_Rp>::move()
{
unique_lock<mutex> __lk(this->__mut_);
this->__sub_wait(__lk);
if (this->__exception_ != nullptr)
rethrow_exception(this->__exception_);
return _VSTD::move(*reinterpret_cast<_Rp*>(&__value_));
}

template <class _Rp>
typename add_lvalue_reference<_Rp>::type
__assoc_state<_Rp>::copy()
{
unique_lock<mutex> __lk(this->__mut_);
this->__sub_wait(__lk);
if (this->__exception_ != nullptr)
rethrow_exception(this->__exception_);
return *reinterpret_cast<_Rp*>(&__value_);
}

首先,__assoc_sub_state 会调用 __sub_wait,这里会等待到 __assoc_sub_state 状态合理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void
__assoc_sub_state::__sub_wait(unique_lock<mutex>& __lk)
{
if (!__is_ready())
{
if (__state_ & static_cast<unsigned>(deferred))
{
__state_ &= ~static_cast<unsigned>(deferred);
__lk.unlock();
__execute();
}
else
while (!__is_ready())
__cv_.wait(__lk);
}
}

这里相当于:

  1. 设置值:拿到 _sub_state 的锁,然后 placement new,再在 _sub_state 上通知
  2. 读取值(wait) :等待 _sub_state 状态设置完成,然后拿 __value_

最后,我们看看外层的 future:

1
2
3
4
5
6
7
8
9
template <class _Rp>
_Rp
future<_Rp>::get()
{
unique_ptr<__shared_count, __release_shared_count> __(__state_);
__assoc_state<_Rp>* __s = __state_;
__state_ = nullptr;
return __s->move();
}

promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// promise<void>

template <>
class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE promise<void>
{
__assoc_sub_state* __state_;

_LIBCPP_INLINE_VISIBILITY
explicit promise(nullptr_t) _NOEXCEPT : __state_(nullptr) {}

template <class> friend class packaged_task;

public:
// ...
};

这里,get_future 会直接返回一个 future:

1
2
3
4
5
6
7
8
template <class _Rp>
future<_Rp>
promise<_Rp>::get_future()
{
if (__state_ == nullptr)
__throw_future_error(future_errc::no_state);
return future<_Rp>(__state_);
}

我们回到 future, 看看它的构造函数:

1
2
3
4
5
6
template <class _Rp>
future<_Rp>::future(__assoc_state<_Rp>* __state)
: __state_(__state)
{
__state_->__attach_future();
}

这里依靠 __assoc_sub_state<_Rp>::__attach_future 来避免重复使用

packaged_task

std::packaged_task 类似 std::function,是一个类型擦除的函数,描述任务,这里还允许 get_future 来访问。

shared_future

shared_future 可以包装一个 future<T>,然后多次使用,它的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <class _Rp>
class _LIBCPP_TEMPLATE_VIS shared_future
{
__assoc_state<_Rp>* __state_;

public:

};

template <class _Rp>
shared_future<_Rp>::~shared_future()
{
if (__state_)
__state_->__release_shared();
}

shared_future 有一堆偏特化的版本,因为它的 get 会非常恶心,future 返回一次咋搞都行,shared_future 可能要处理很多语义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <class _Rp>
class _LIBCPP_TEMPLATE_VIS shared_future<_Rp&>
{
__assoc_state<_Rp&>* __state_;

public:
// retrieving the value
_LIBCPP_INLINE_VISIBILITY
_Rp& get() const {return __state_->copy();}

};

template <>
class _LIBCPP_TYPE_VIS _LIBCPP_AVAILABILITY_FUTURE shared_future<void>
{
__assoc_sub_state* __state_;

public:

};

std::async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <class _Fp, class... _Args>
_LIBCPP_NODISCARD_AFTER_CXX17
future<typename __invoke_of<typename decay<_Fp>::type, typename decay<_Args>::type...>::type>
async(launch __policy, _Fp&& __f, _Args&&... __args)
{
typedef __async_func<typename decay<_Fp>::type, typename decay<_Args>::type...> _BF;
typedef typename _BF::_Rp _Rp;

#ifndef _LIBCPP_NO_EXCEPTIONS
try
{
#endif
if (__does_policy_contain(__policy, launch::async))
return _VSTD::__make_async_assoc_state<_Rp>(_BF(_VSTD::__decay_copy(_VSTD::forward<_Fp>(__f)),
_VSTD::__decay_copy(_VSTD::forward<_Args>(__args))...));
#ifndef _LIBCPP_NO_EXCEPTIONS
}
catch ( ... ) { if (__policy == launch::async) throw ; }
#endif

if (__does_policy_contain(__policy, launch::deferred))
return _VSTD::__make_deferred_assoc_state<_Rp>(_BF(_VSTD::__decay_copy(_VSTD::forward<_Fp>(__f)),
_VSTD::__decay_copy(_VSTD::forward<_Args>(__args))...));
return future<_Rp>{};
}

里面实现大概是:

  1. 创建 __async_assoc_state 或者 __defered_assoc_state
  2. lazy 或者动态的创建线程
  3. 创建 future

folly Future & Executor

folly Future 相对于 future,原理并没有太大差异,但是外层和实现丰富了很多:

  1. 使用 folly::Function,函数对象支持了 SBO 等
  2. then 等组合子,方便处理代码
  3. via 等,支持绑定 executor,并抽出了 FutureSemiFuture 等类型

    folly 对 Future 的支持相当复杂,继承链大概包括:

1
2
3
4
5
CoreBase
- Core<T>

FutureBase<T> (包含 Core<T>)
- Future<T> / SemiFuture<T>

这里的 Future 允许链式、poll 来查看是否成功等。SemiFuture 则是状态的描述。Future 有对应的 Executor,靠 KeepAlive 来指向 Executor,SemiFuture 则没有,除了 inline 执行(即在本线程就地执行),不会允许对应的逻辑出现。

CoreBase 描述了 Future 基本的对应状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
///   +----------------------------------------------------------------+
/// | ---> OnlyResult ----- |
/// | / \ |
/// | (setResult()) (setCallback()) |
/// | / \ |
/// | Start ---------> ------> Done |
/// | \ \ / |
/// | \ (setCallback()) (setResult()) |
/// | \ \ / |
/// | \ ---> OnlyCallback --- |
/// | \ or OnlyCallbackAllowInline |
/// | \ \ |
/// | (setProxy()) (setProxy()) |
/// | \ \ |
/// | \ ------> Empty |
/// | \ / |
/// | \ (setCallback()) |
/// | \ / |
/// | --------> Proxy ---------- |
/// +----------------------------------------------------------------+

因为要描述下一个任务这样的 chain,所以这套逻辑还是比较恶心的。此外,folly::Future 逻辑涉及了对应的 Executor,每个 future 会有 dispatch 给谁的逻辑,这个和链式一样,引入了复杂度。

Why are futures slow

future 在 C++ 中出现在 C++11 的标准库,而别的语言也常有类似的东西。它是由库来实现的。类似 std::functionstd::future 是堆上的、类型擦除的。这两部分抽象都带来了一定的开销。

之后会分别介绍 Seastar future、Rust future、C++ executors,来看看它们是怎么抽象的。简单来说:

  1. C++ executors 会有 sender / receiver,用模版类型来封装他们,然后提供了各种算法来做抽象
  2. Rust 靠 Future<T> 来抽象,Box<Future<T>> 来表示动态一些的对象。Future<T> 需要可以 poll(),上层的状态机来 polling,做查询
  3. seastar 靠模型的抽象,来简化了相关的逻辑。

References

  1. libcxx future

  2. folly future

  3. 浅谈The C++ Executors https://zhuanlan.zhihu.com/p/395250667 (绝世好文)