crossbeam-epoch

crossbeam-epoch 是 crossbeam 用来管理内存的子包,它实现了 epoch-based reclaim。并发编程的时候会有内存回收问题和 ABA 问题,但本质上,解决了内存问题,就解决了 ABA 问题,因为后者本质是内存 reuse 导致的。

Epoch Based Reclaim 有点类似某种程度上的 RC,但是它的 RC 的粒度要粗很多:针对 Epoch,而非针对 Object,这让维护和使用它的代价变小了很多,它引入的语义如下:

There are a few non-GC-based ways of managing memory for lock-free code, but they all come down to the same core observations:

  1. There are two sources of reachability at play – the data structure, and the snapshots in threads accessing it. Before we delete a node, we need to know that it cannot be reached in either of these ways.
  2. Once a node has been unlinked from the data structure, no new snapshots reaching it will be created.

综上,一个对象只能被删除一次,随着最后一个可能可能读到它的线程结束,它就是「可清除」的了,它会随着 epoch 的推高而清除。

crossbeam-epoch 实现了上述的语义,并引入了一套子系统,来表示对应的内存指针和引用。

Epoch 提供了:

  1. Global Epoch Counter (取值 0/1/2, 三个 epoch 迭代)
  2. 每个 Epoch 挂的 Garbage
  3. 每个 Thread 是否是 “Active” 的
  4. 每个 Thread 的 Epoch

每个线程启动的时候,会把自己的 Epoch 设置成全局的 Epoch,unlink 一个对象的时候,会放到 Global Garbage 列表中,「当前 Epoch」对应的地方。线程完成操作的时候,会清除 Active 标记。这里有3个 epoch,没有任何读者的 Epoch 理论上是 current - 2, 它上面的对象可以被 GC。

性能相关可见:Performance of memory reclamation for lockless synchronization

API of crossbeam-epoch

pin() 产生一个本线程的 GuardGuard 没退出表示这个线程还在活跃,实际上就是 某个线程/版本的 RAII。

然后有下面的智能指针:

To put the Guard to use, Crossbeam provides a set of three pointer types meant to work together:

  • Owned<T>, akin to Box<T>, which points to uniquely-owned data that has not yet been published in a concurrent data structure.
  • Shared<'a, T>, akin to &'a T, which points to shared data that may or may not be reachable from a data structure, but it guaranteed not to be freed during lifetime 'a.
  • Atomic<T>, akin to std::sync::atomic::AtomicPtr, which provides atomic updates to a pointer using the Owned and Shared types, and connects them to a Guard.

上面的类型足以表述了,具体见:https://aturon.github.io/blog/2015/08/27/epoch/

一些要点是(在上文的 Managing garbage 段):

  1. unlink 的时候可以运行 destructor, 而 ebr 只具体回收内存(安全性:操作都会走 cas)
  2. 线程有一些 TLS 的垃圾列表,可能会在有一定阈值的时候 emit 到全局的列表中
  3. epoch::pin 的时候,可能会 emit 垃圾甚至触发垃圾清理

Code

Tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
➜  src git:(master) tree
.
├── atomic.rs
├── collector.rs
├── default.rs
├── deferred.rs
├── epoch.rs
├── guard.rs
├── internal.rs
├── lib.rs
└── sync
├── list.rs
├── mod.rs
└── queue.rs

首先,我们关注一下周边工具,例子是 sync 目录,这个目录有点循环引用的味道,用 crossbeam-epoch 实现了两个组件:

  1. 并发的 linked-list queue:按照 http://dl.acm.org/citation.cfm?id=248106https://doi.org/10.1007/978-3-540-30232-2_7 实现
  2. 并发的侵入式链表:http://www.cs.tau.ac.il/~afek/p73-Lock-Free-HashTbls-michael.pdf

上面两个 case 可以当作实现的例子,然后 crossbeam-epoch 也用到了它俩。

Queue 的接口比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
impl<T> Queue<T> {
/// Create a new, empty queue.
pub(crate) fn new() -> Queue<T>;

/// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
pub(crate) fn push(&self, t: T, guard: &Guard);

/// Attempts to dequeue from the front.
///
/// Returns `None` if the queue is observed to be empty.
pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T>;

/// Attempts to dequeue from the front, if the item satisfies the given condition.
///
/// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
/// condition.
pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
where
T: Sync,
F: Fn(&T) -> bool;
}

而 List 是一个侵入式结构,大概需要实现一个 IsElement:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/// An entry in a linked list.
///
/// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different
/// cache-line than thread-local data in terms of performance.
#[derive(Debug)]
pub(crate) struct Entry;

/// Implementing this trait asserts that the type `T` can be used as an element in the intrusive
/// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance
/// of `Entry`.
///
/// # Example
///
/// ```ignore
/// struct A {
/// entry: Entry,
/// data: usize,
/// }
///
/// impl IsElement<A> for A {
/// fn entry_of(a: &A) -> &Entry {
/// let entry_ptr = ((a as usize) + offset_of!(A, entry)) as *const Entry;
/// unsafe { &*entry_ptr }
/// }
///
/// unsafe fn element_of(entry: &Entry) -> &T {
/// let elem_ptr = ((entry as usize) - offset_of!(A, entry)) as *const T;
/// &*elem_ptr
/// }
///
/// unsafe fn finalize(entry: &Entry, guard: &Guard) {
/// guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
/// }
/// }
/// ```
///
/// This trait is implemented on a type separate from `T` (although it can be just `T`), because
/// one type might be placeable into multiple lists, in which case it would require multiple
/// implementations of `IsElement`. In such cases, each struct implementing `IsElement<T>`
/// represents a distinct `Entry` in `T`.
///
/// For example, we can insert the following struct into two lists using `entry1` for one
/// and `entry2` for the other:
///
/// ```ignore
/// struct B {
/// entry1: Entry,
/// entry2: Entry,
/// data: usize,
/// }
/// ```
///
pub(crate) trait IsElement<T> {
/// Returns a reference to this element's `Entry`.
fn entry_of(_: &T) -> &Entry;

/// Given a reference to an element's entry, returns that element.
///
/// ```ignore
/// let elem = ListElement::new();
/// assert_eq!(elem.entry_of(),
/// unsafe { ListElement::element_of(elem.entry_of()) } );
/// ```
///
/// # Safety
///
/// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
/// of the element type (`T`).
unsafe fn element_of(_: &Entry) -> &T;

/// The function that is called when an entry is unlinked from list.
///
/// # Safety
///
/// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
/// of the element type (`T`).
unsafe fn finalize(_: &Entry, _: &Guard);
}

/// A lock-free, intrusive linked list of type `T`.
#[derive(Debug)]
pub(crate) struct List<T, C: IsElement<T> = T>;

这个结构就,非常侵入式,牛逼的。

Epoch

然后是 epoch 有关的类型,下面这部分在 epoch.rs:

1
2
3
4
5
6
7
8
//! The global epoch
//!
//! The last bit in this number is unused and is always zero. Every so often the global epoch is
//! incremented, i.e. we say it "advances". A pinned participant may advance the global epoch only
//! if all currently pinned participants have been pinned in the current epoch.
//!
//! If an object became garbage in some epoch, then we can be sure that after two advancements no
//! participant will hold a reference to it. That is the crux of safe memory reclamation.

看上面的表示,Epoch 最后一位表示是否 pin,这个在全局 Epoch 里面是没有意义的,但是局部数据对象可能依赖这个数据,而前面的数据代表具体版本:

1
2
3
4
5
6
7
8
9
/// An epoch that can be marked as pinned or unpinned.
///
/// Internally, the epoch is represented as an integer that wraps around at some unspecified point
/// and a flag that represents whether it is pinned or unpinned.
#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
pub(crate) struct Epoch {
/// The least significant bit is set if pinned. The rest of the bits hold the epoch.
data: usize,
}

这里还包了一层 AtomicEpoch, AtomicEpoch 提供了对 Epochload, storecas 操作:

1
2
3
4
5
6
7
/// An atomic value that holds an `Epoch`.
#[derive(Default, Debug)]
pub(crate) struct AtomicEpoch {
/// Since `Epoch` is just a wrapper around `usize`, an `AtomicEpoch` is similarly represented
/// using an `AtomicUsize`.
data: AtomicUsize,
}

Deferred

Deferred 是一个 defer 的 data + destructor, 我感觉可以 Box<Fn(...)>,不过它这感觉泛用很多。它靠谱的功能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/// Number of words a piece of `Data` can hold.
///
/// Three words should be enough for the majority of cases. For example, you can fit inside it the
/// function pointer together with a fat pointer representing an object that needs to be destroyed.
const DATA_WORDS: usize = 3;

/// Some space to keep a `FnOnce()` object on the stack.
type Data = [usize; DATA_WORDS];

/// A `FnOnce()` that is stored inline if small, or otherwise boxed on the heap.
///
/// This is a handy way of keeping an unsized `FnOnce()` within a sized structure.
pub(crate) struct Deferred {
call: unsafe fn(*mut u8),
data: MaybeUninit<Data>,
_marker: PhantomData<*mut ()>, // !Send + !Sync
}

impl Deferred {
/// Constructs a new `Deferred` from a `FnOnce()`.
pub(crate) fn new<F: FnOnce()>(f: F) -> Self;

/// Calls the function.
#[inline]
pub(crate) fn call(mut self) {
let call = self.call;
unsafe { call(self.data.as_mut_ptr() as *mut u8) };
}
}

其实很明显了,感觉还是挺简单的。

Bag

internal.rs 里面,实现了 BagSealedBag. Bag 是一组 Deferred, 而 SealedBag 则是定了某个版本的 SealedBag, 他们内容如下:

1
2
3
4
5
6
7
//! # Thread-local bag
//!
//! Objects that get unlinked from concurrent data structures must be stashed away until the global
//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
//! for amortizing the synchronization cost of pushing the garbages to a global queue.

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/// Maximum number of objects a bag can contain.
#[cfg(not(crossbeam_sanitize))]
const MAX_OBJECTS: usize = 62;
#[cfg(crossbeam_sanitize)]
const MAX_OBJECTS: usize = 4;

/// A bag of deferred functions.
pub(crate) struct Bag {
/// Stashed objects.
deferreds: [Deferred; MAX_OBJECTS],
len: usize,
}

impl Bag {
/// Returns a new, empty bag.
pub(crate) fn new() -> Self;

/// Returns `true` if the bag is empty.
pub(crate) fn is_empty(&self) -> bool;

/// Attempts to insert a deferred function into the bag.
///
/// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
/// full.
///
/// # Safety
///
/// It should be safe for another thread to execute the given function.
pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred>;

/// Seals the bag with the given epoch.
fn seal(self, epoch: Epoch) -> SealedBag;
}

/// A pair of an epoch and a bag.
#[derive(Default, Debug)]
struct SealedBag {
epoch: Epoch,
_bag: Bag,
}

/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
unsafe impl Sync for SealedBag {}

impl SealedBag {
/// Checks if it is safe to drop the bag w.r.t. the given global epoch.
fn is_expired(&self, global_epoch: Epoch) -> bool {
// A pinned participant can witness at most one epoch advancement. Therefore, any bag that
// is within one epoch of the current one cannot be destroyed yet.
global_epoch.wrapping_sub(self.epoch) >= 2
}
}

外部接口

外部很多接口是现在 default.rs 里面。很多读者可能会很困惑,刚刚不是还在讲工具类吗,现在怎么外部接口了,答案是剩下内容都是紧密缝合的,适合自顶向下讲了。工具先看完,然后自顶向下推进,还挺好的。

https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-epoch/src/default.rs

这里有:

  1. 全局 lazy_static 的 Collector
  2. 单个线程一个的 LocalHandle, 由 COLLECTOR.register() 生成

然后外部的 pin 接口会拿到 tls 的 LocalHandle, 用它来 pin.

CollectorLocalHandle 则是 内部的 GlobalLocal 的包装器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/// An epoch-based garbage collector.
pub struct Collector {
pub(crate) global: Arc<Global>,
}


impl Collector {
/// Creates a new collector.
pub fn new() -> Self {
Self::default()
}

/// Registers a new handle for the collector.
pub fn register(&self) -> LocalHandle {
Local::register(self)
}
}


/// A handle to a garbage collector.
pub struct LocalHandle {
pub(crate) local: *const Local,
}

impl LocalHandle {
/// Pins the handle.
#[inline]
pub fn pin(&self) -> Guard {
unsafe { (*self.local).pin() }
}

/// Returns `true` if the handle is pinned.
#[inline]
pub fn is_pinned(&self) -> bool {
unsafe { (*self.local).is_pinned() }
}

/// Returns the `Collector` associated with this handle.
#[inline]
pub fn collector(&self) -> &Collector {
unsafe { (*self.local).collector() }
}
}

impl Drop for LocalHandle {
#[inline]
fn drop(&mut self) {
unsafe {
Local::release_handle(&*self.local);
}
}
}

这俩哥们基本属于套娃包装,所以我们最后当然必须再来看看 internal.rs,这里面东西主要有 Global 和 Local:

Global 是全局状态池子,由 Collector 包装(等下会讲),内容大概如下:

1
2
3
4
5
6
7
8
9
10
11
/// The global data for a garbage collector.
pub(crate) struct Global {
/// The intrusive linked list of `Local`s.
locals: List<Local>,

/// The global queue of bags of deferred functions.
queue: Queue<SealedBag>,

/// The global epoch.
pub(crate) epoch: CachePadded<AtomicEpoch>,
}

Global 会挂着:

  1. SealedBag 的队列,作为延迟调用的函数
  2. List<Local>,作为活跃的线程的侵入式链表
  3. epoch, 全局的 atomic epoch,最后一位是无用的

Global 的方法得全贴出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
impl Global {
/// Number of bags to destroy.
const COLLECT_STEPS: usize = 8;

/// Creates a new global data for garbage collection.
#[inline]
pub(crate) fn new() -> Self {
Self {
locals: List::new(),
queue: Queue::new(),
epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
}
}

/// Pushes the bag into the global queue and replaces the bag with a new empty bag.
pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let bag = mem::replace(bag, Bag::new());

atomic::fence(Ordering::SeqCst);

let epoch = self.epoch.load(Ordering::Relaxed);
self.queue.push(bag.seal(epoch), guard);
}

/// Collects several bags from the global queue and executes deferred functions in them.
///
/// Note: This may itself produce garbage and in turn allocate new bags.
///
/// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
/// path. In other words, we want the compiler to optimize branching for the case when
/// `collect()` is not called.
#[cold]
pub(crate) fn collect(&self, guard: &Guard) {
let global_epoch = self.try_advance(guard);

let steps = if cfg!(crossbeam_sanitize) {
usize::max_value()
} else {
Self::COLLECT_STEPS
};

for _ in 0..steps {
match self.queue.try_pop_if(
&|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
guard,
) {
None => break,
Some(sealed_bag) => drop(sealed_bag),
}
}
}

/// Attempts to advance the global epoch.
///
/// The global epoch can advance only if all currently pinned participants have been pinned in
/// the current epoch.
///
/// Returns the current global epoch.
///
/// `try_advance()` is annotated `#[cold]` because it is rarely called.
#[cold]
pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);

// TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
// easy to implement in a lock-free manner. However, traversal can be slow due to cache
// misses and data dependencies. We should experiment with other data structures as well.
for local in self.locals.iter(guard) {
match local {
Err(IterError::Stalled) => {
// A concurrent thread stalled this iteration. That thread might also try to
// advance the epoch, in which case we leave the job to it. Otherwise, the
// epoch will not be advanced.
return global_epoch;
}
Ok(local) => {
let local_epoch = local.epoch.load(Ordering::Relaxed);

// If the participant was pinned in a different epoch, we cannot advance the
// global epoch just yet.
if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
return global_epoch;
}
}
}
}
atomic::fence(Ordering::Acquire);

// All pinned participants were pinned in the current global epoch.
// Now let's advance the global epoch...
//
// Note that if another thread already advanced it before us, this store will simply
// overwrite the global epoch with the same value. This is true because `try_advance` was
// called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
// advanced two steps ahead of it.
let new_epoch = global_epoch.successor();
self.epoch.store(new_epoch, Ordering::Release);
new_epoch
}
}

这里,其他函数含义都非常清晰,难一点的是 try_advance ,如果它发现现在所有活跃线程都是 pinned 的,且等于自身周期,那么它会推进。这个地方有点难懂,我们可以回顾一下,可以推进是因为只有这个版本的 reader 了,再之前的数据对象可以 gc 掉,这里和 pin 的流程是有关的。总之,每个 Local 都是本周期且 pin 了,就可以推进 + 回收了。回收流程见 collect

至于 Local,其实挺…听不 RAII 的,它会在 register 的时候 创建,pin 的时候初始化,release_handle 的时候销毁。它本身会被 TLS 的使用,和线程绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/// Participant for garbage collection.
pub(crate) struct Local {
/// A node in the intrusive linked list of `Local`s.
entry: Entry,

/// The local epoch.
epoch: AtomicEpoch,

/// A reference to the global data.
///
/// When all guards and handles get dropped, this reference is destroyed.
collector: UnsafeCell<ManuallyDrop<Collector>>,

/// The local bag of deferred functions.
pub(crate) bag: UnsafeCell<Bag>,

/// The number of guards keeping this participant pinned.
guard_count: Cell<usize>,

/// The number of active handles.
handle_count: Cell<usize>,

/// Total number of pinnings performed.
///
/// This is just an auxiliary counter that sometimes kicks off collection.
pin_count: Cell<Wrapping<usize>>,
}

你会发现它也有个 epoch, 没错,这玩意只有 pin 的时候会初始化。我看还有 repin 什么的,估计是为了优化准备的吧。

最后有个 Guard,本身就是很简单的绑定 Local 的组件了。需要注意的是,LocalGuard 大部分接口都是串行的,只有读 epoch 的时候,会并行,它也只靠 epoch 和 collector 与 Global 互相通信

Reference