brpc bthread execution-queue

Execution Queue 最早是 brpc 中,多个线程往一个 fd 写数据的时候使用的,这里的需求是:

  1. 每个写应该串行的完成
  2. 会有并发的写.

Execution Queue 会接受用户投递的任务, 投递完成之后, 用户线程就直接返回了, 然后自己把任务做完, 可能要靠用户的 callback 来通知用户完成. 在内部, 任务会给给投递给它的任务组 batch, 任务 submit 给一个线程异步 batch 处理.

此外, bthread 的 execution queue 还支持 cancel 任务和调度高优先级的任务.

Usage

src/bthread/execution_queue.h 描述了如何使用 execution queue, 这些东西也可以在项目文档里看到.

这里可以简单介绍一下这个流程,大概要实现一个 Batch 的 demo_execute 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

// Iterate over the given tasks
//
// Examples:

void user_fn(T);

int demo_execute(void* meta, TaskIterator<T>& iter) {
if (iter.is_queue_stopped()) {
// destroy meta and related resources
return 0;
}
for (; iter; ++iter) {
// user_fn(*iter)
// or user_fn(iter->a_member_of_T)
}
return 0;
}

这里 T 是用户定义的 task 需要处理的对象的类型。这个有点绕,因为你还要定义 demo_execute 来处理一个 batch 的任务,和 TaskIterator 交互。举个例子理解下,比如说,对于用户的 IO 写同一个 fd 的任务,brpc 定义了一个 T = butil::IOBuf* 的例子,来处理 IO 相关的需求. 其实这个就很好理解了吧。

Execution Queue 的生命周期

Execution Queue 本身会在 bthread 中执行,bthread 本身是有一堆参数的,queue 会有一堆参数:

1
2
3
4
5
6
7
8
9
10
11
struct ExecutionQueueOptions {
ExecutionQueueOptions();
// Attribute of the bthread which execute runs on
// default: BTHREAD_ATTR_NORMAL
bthread_attr_t bthread_attr;

// Executor that tasks run on. bthread will be used when executor = NULL.
// Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
// Executor is in-place(synchronous).
Executor * executor;
};

这个 Executor 是一个 用户定义的 Executor,用户可以定义在自己的线程池之类的来执行,不过如果用户自己定义了一个 Executor,比如绑了个 folly 的 Executor 然后丢给 folly 执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
// the default options.
// Returns 0 on success, errno otherwise
// NOTE: type |T| can be non-POD but must be copy-constructible
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>& iter),
void* meta);

template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);

template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);

上面任务有 start, stop, 和 join。基本上是一个完整的流程。ExecutionQueueId<T> 是类似 bthread_id 一样的引用,关于这个结构,其实可以参考 brpc 的 memory-management 文档:https://github.com/apache/incubator-brpc/blob/master/docs/cn/memory_management.md 。这里基本上是 4B 地址 + 4B 版本号。

提交任务

提交任务有下列的 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Thread-safe and Wait-free.
// Execute a task with options. e.g
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, we will use default options (normal task)
// If |handle| is not NULL, we will assign it with the handler of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle);

// [Thread safe and ABA free] Cancel the corresponding task.
// Returns:
// -1: The task was executed or h is an invalid handle
// 0: Success
// 1: The task is executing
int execution_queue_cancel(const TaskHandle& h);

这里 execution_queue_execute 可以传入一个参数,带上一个 TaskOptions,也可以拿到一个 TaskHandle,这个 handle 可以用来取消任务。下面在 TaskOptions 还有一些执行相关的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

struct TaskOptions {
TaskOptions();
TaskOptions(bool high_priority, bool in_place_if_possible);

// Executor would execute high-priority tasks in the FIFO order but before
// all pending normal-priority tasks.
// NOTE: We don't guarantee any kind of real-time as there might be tasks still
// in process which are uninterruptible.
//
// Default: false
bool high_priority;

// If |in_place_if_possible| is true, execution_queue_execute would call
// execute immediately instead of starting a bthread if possible
//
// Note: Running callbacks in place might cause the dead lock issue, you
// should be very careful turning this flag on.
//
// Default: false
bool in_place_if_possible;
};


const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(false, false);
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(true, false);
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true);

这里我们可以看到对应的优先级和 inplaceinplace 会在前台执行,high_priority 会投递到高优执行,不过这里还满足串行化的语义。

这里 cancel 接口有点让人困惑,没有被执行的任务可以被 cancel,这个得对着代码来理解了。

实现

从用户提交任务到 lock-free 的添加到任务列表

这里的实现可以在 execution_queue_inl.hexecution_queue.cc 里面,关键类型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename T>
struct ExecutionQueueId {
uint64_t value;
};

// 任务的状态
enum TaskStatus {
UNEXECUTED = 0,
EXECUTING = 1,
EXECUTED = 2
};

struct TaskNode;
class ExecutionQueueBase;
typedef void (*clear_task_mem)(TaskNode*);

struct BAIDU_CACHELINE_ALIGNMENT TaskNode;

template <typename T>
class ExecutionQueue : public ExecutionQueueBase;

class TaskIteratorBase;

template <typename T>
class TaskIterator : public TaskIteratorBase;

ExecutionQueue 包了一层 ExecutionQueueBaseExecutionQueueBase 的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class BAIDU_CACHELINE_ALIGNMENT ExecutionQueueBase {
DISALLOW_COPY_AND_ASSIGN(ExecutionQueueBase);
struct Forbidden {};


private:

// Don't change the order of _head, _versioned_ref and _stopped unless you
// see improvement of performance in test
butil::atomic<TaskNode*> BAIDU_CACHELINE_ALIGNMENT _head; // FIFO 的单队列, 这是队列的尾部, 通过 cas 来 enqueue.
butil::atomic<uint64_t> BAIDU_CACHELINE_ALIGNMENT _versioned_ref; // 整个对列的 _versioned_ref 计数器, 用来处理 aba 问题.
butil::atomic<bool> BAIDU_CACHELINE_ALIGNMENT _stopped; // 队列是否被外部停止.
butil::atomic<int64_t> _high_priority_tasks; // 高优任务计数器, 对列和执行里有任何高优任务都会添加这个计数器.
uint64_t _this_id;
void* _meta; // 用户定义的, 绑定到整个 queue 的成员, 作为 execution_func 的参数.
void* _type_specific_function; // 对应用户的 execution_func
execute_func_t _execute_func; // 用户提供的执行函数
clear_task_mem _clear_func; // 清除 Task 上的东西(用户提供) 和 Task 挂的节点的内存.
ExecutionQueueOptions _options;
butil::atomic<int>* _join_butex;
};

ExecutionQueueBase 被实现成一个类似栈的结构,我也说不清是栈还是队列,因为这个地方 enqueue 类似栈,处理类似队列。

这里的入口是:

1
2
3
4
5
6
7
8
9
10
11
12
13
template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle) {
typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->execute(task, options, handle);
} else {
return EINVAL;
}
}

我们可以梳理一下执行的链路:

  1. execution_queue_execute : 用户提交任务到某个 execution queue,派发给 ExecutionQueue<T> 调用 execute,来具体执行
  2. ExecutionQueue<T>::execute 中,这里会针对 T 创建对应的 TaskNode,初始化内存和资源,然后投递给 ExecutionQueueBase::start_execute

那么走到最重要的内容 ExecutionQueueBase 了,刚刚我们看到了,它有个 head_ 成员,是个 atomic<TaskNode*>,然后还有 _high_priority_tasks_stopped 表示 “是否有高优操作” 和 “是否被停止”。这两个配置和 start_execute 等函数就是这个结构的核心了。我们接着看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//! 创建好任务后, 会投递到 start_execute 来处理
void ExecutionQueueBase::start_execute(TaskNode* node) {
node->next = TaskNode::UNCONNECTED;
node->status = UNEXECUTED;
node->iterated = false;

// _high_priority_tasks 是表示正在执行的/等待的任务是否有高优先级任务.
if (node->high_priority) {
// Add _high_priority_tasks before pushing this task into queue to
// make sure that _execute_tasks sees the newest number when this
// task is in the queue. Although there might be some useless for
// loops in _execute_tasks if this thread is scheduled out at this
// point, we think it's just fine.
_high_priority_tasks.fetch_add(1, butil::memory_order_relaxed);
}

// 拿到上一次的 prev_head, 这个时候, 竞选要求只有 prev_head 是 nullptr 的时候,
// 才能成为 group 的 leader.
// 这里如果没有成为 group 的 leader, 会在这里堆积.

TaskNode* const prev_head = _head.exchange(node, butil::memory_order_release);
if (prev_head != NULL) {
node->next = prev_head;
return;
}

// Group 的 Leader 有权限执行任务.
//
// Get the right to execute the task, start a bthread to avoid deadlock
// or stack overflow
//
// 因为是 leader, 它的 next 应该是 null.
node->next = NULL;
node->q = this;

ExecutionQueueVars* const vars = get_execq_vars();
vars->execq_active_count << 1;
if (node->in_place) { // 如果是 in-place, 那么在本线程(bthread) 执行.
int niterated = 0;
_execute(node, node->high_priority, &niterated);
TaskNode* tmp = node;
// return if no more
// 如果是高优任务, 那么处理一下
if (node->high_priority) {
_high_priority_tasks.fetch_sub(niterated, butil::memory_order_relaxed);
}
// 如果有 more_tasks, 切到别的线程执行这个 group.
if (!_more_tasks(tmp, &tmp, !node->iterated)) {
vars->execq_active_count << -1;
return_task_node(node);
return;
}
}

// 根据 executor 来执行, 这个时候执行时候的 `node` 是队首.
if (nullptr == _options.executor) {
// 在后台线程中, 使用 _execute_tasks 来执行.
bthread_t tid;
// We start the execution thread in background instead of foreground as
// we can't determine whether the code after execute() is urgent (like
// unlock a pthread_mutex_t) in which case implicit context switch may
// cause undefined behavior (e.g. deadlock)
//
// 在 background 中执行 (TODO: 这个地方会丢进队列吗?)
if (bthread_start_background(&tid, &_options.bthread_attr,
_execute_tasks, node) != 0) {
PLOG(FATAL) << "Fail to start bthread";
_execute_tasks(node);
}
} else {
// submit 异步执行.
if (_options.executor->submit(_execute_tasks, node) != 0) {
PLOG(FATAL) << "Fail to submit task";
_execute_tasks(node);
}
}
}

这个代码比较长,慢慢来:

  1. 如果是高优任务,添加高优的 counter,执行线程会检测到这个 counter,来做一些特殊处理
  2. 通过 _head.exchange(node, butil::memory_order_release) 来向栈中推进内容,_head 为 0 的时候,设置成功的能够成为这个 group 的 leader,在这个 bthread 或者线程下执行,否则返回程序
  3. (从这里开始都是 Leader 的行为) 根据 in_place 等参数,决定就地执行还是异步执行,派发给 _execute_tasks

_execute_tasks 里面有个大循环,每次会把本个 batch 尽量执行完(为什么是尽量呢,接着看):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

// 调用 execute 来执行对应的任务, 这是一个 static 函数, 需要合理处理高优先级任务.
void* ExecutionQueueBase::_execute_tasks(void* arg) {
ExecutionQueueVars* vars = get_execq_vars();
TaskNode* head = (TaskNode*)arg;
ExecutionQueueBase* m = (ExecutionQueueBase*)head->q;
TaskNode* cur_tail = NULL;
bool destroy_queue = false;
for (;;) {
// 如果队列头已经执行过, 那么切一个队列头(move next), 然后把之前的队列头
// 处理.
if (head->iterated) {
CHECK(head->next != NULL);
TaskNode* saved_head = head;
head = head->next;
m->return_task_node(saved_head);
}
int rc = 0;
// 如果有高优先级任务, 调用 _execute 直接处理高优任务, 如果没有高优任务, 调度走等待
// 投递进来继续组 batch.
if (m->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
int nexecuted = 0;
// Don't care the return value (因为不会执行到 stop).
rc = m->_execute(head, true, &nexecuted);
// 减少对应的高优任务数量
m->_high_priority_tasks.fetch_sub(
nexecuted, butil::memory_order_relaxed);
// 如果 nexecuted == 0, 调度走, 等待任务被塞到执行队列.
if (nexecuted == 0) {
// Some high_priority tasks are not in queue
sched_yield();
}
} else {
// 没有高优任务, 执行现有的这批.
rc = m->_execute(head, false, NULL);
}
// 是否收到 stop 对象.
if (rc == ESTOP) {
destroy_queue = true;
}
// Release TaskNode until uniterated task or last task
//
// 释放掉现在所有的执行过的内容, 不过这里只是连续释放, 直到有 uniterated 的对象.
while (head->next != NULL && head->iterated) {
TaskNode* saved_head = head;
head = head->next;
m->return_task_node(saved_head);
}
// 到这个 batch 的尾部.
if (cur_tail == NULL) {
for (cur_tail = head; cur_tail->next != NULL;
cur_tail = cur_tail->next) {}
}
// break when no more tasks and head has been executed
//
// 把加进来的和已有的组成一个新的 Batch, cur_tail 和 &cur_tail 这个有点让人困惑, 因为队列要满足 FIFO 条件,
// 投递进来的原本是 T3->T2->T1. 在第一个 batch 里面, T1 设置到了 head = NULL 到 head = T1, 所以 T1 是
// leader, 这个时候, cur_tail == T1. old_head == T1, new_tail = &T1(即 head).
// 再次执行完的时候, 这个队列会变成 T3->T2, 但是执行顺序是 T2->T3, 所以, old_head 对应 T2, new_tail 也应该传入
// T2.
// 本来这里没有问题, 但是可能 has_uniterated == true, 需要缝合两个队列.
if (!m->_more_tasks(cur_tail, &cur_tail, !head->iterated)) {
CHECK_EQ(cur_tail, head);
CHECK(head->iterated);
m->return_task_node(head);
break;
}
}
if (destroy_queue) {
CHECK(m->_head.load(butil::memory_order_relaxed) == NULL);
CHECK(m->_stopped);
// Add _join_butex by 2 to make it equal to the next version of the
// ExecutionQueue from the same slot so that join with old id would
// return immediately.
//
// 1: release fence to make join sees the newest changes when it sees
// the newest _join_butex
m->_join_butex->fetch_add(2, butil::memory_order_release/*1*/);
butex_wake_all(m->_join_butex);
vars->execq_count << -1;
butil::return_resource(slot_of_id(m->_this_id));
}
vars->execq_active_count << -1;
return NULL;
}

这个地方逻辑如下:

  1. 处理已经迭代完的内容, 这里涉及 stop 之类的逻辑
  2. 调用 _execute 函数,来具体执行这个 batch
  3. 看看有没有没执行完的任务和最新的任务,走 _more_tasks

看客这个地方可能会怪我全部贴代码了,不过这个地方流程确实只能硬贴。因为 (2) (3) 都是有巨坑的

特殊情况:第一次执行

还记得 _head.exchange 不,这个地方第一次设置成功的 Nodenextnullptr,第一次执行的时候,整个 batch 只有它一个成员。

高优先级任务的处理

我们看到,start_execute 的时候,这里如果是高优任务,就会添加高优的 counter,在 _execute_tasks 的循环里,我们看到这里检查了高优任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int rc = 0;
// 如果有高优先级任务, 调用 _execute 直接处理高优任务, 如果没有高优任务, 调度走等待
// 投递进来继续组 batch.
if (m->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
int nexecuted = 0;
// Don't care the return value (因为不会执行到 stop).
rc = m->_execute(head, true, &nexecuted);
// 减少对应的高优任务数量
m->_high_priority_tasks.fetch_sub(
nexecuted, butil::memory_order_relaxed);
// 如果 nexecuted == 0, 调度走, 等待任务被塞到执行队列.
if (nexecuted == 0) {
// Some high_priority tasks are not in queue
sched_yield();
}
} else {
// 没有高优任务, 执行现有的这批.
rc = m->_execute(head, false, NULL);
}

我们可以看看 _execute 函数的签名:

1
int ExecutionQueueBase::_execute(TaskNode* head, bool high_priority, int* niterated);

这里可以看到,_execute 里面有个 high_priority 的标记,这个是用来干什么的呢?我们这里介绍一下,一个 Batch 只能执行一种优先级的任务,要么高要么低。如果插入了一个高优任务,它要尽量在低优先级任务之前被执行。这里可以想象一下提供给用户的模型:

  • 有两个队列,一个低优一个高优
  • executor 并非是「Batch 执行」,它是串行执行的,从高优看看有没有,有就执行,否则从低优队列取一个任务执行

可以看到,这里是一个非 batch 执行的语义,这个 _execute 靠计数器实现了类似的语义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 具体执行的逻辑, start_execute -> execute_task -> execute.
int ExecutionQueueBase::_execute(TaskNode* head, bool high_priority, int* niterated) {
// 如果是一个 stop task, 那么具体执行
if (head != NULL && head->stop_task) {
CHECK(head->next == NULL);
head->iterated = true; // 设置自身为 executed.
head->status = EXECUTED;
// 如果是 stop, 那么这里 `high_priority == false`.
TaskIteratorBase iter(NULL, this, true, false);
_execute_func(_meta, _type_specific_function, iter);
if (niterated) {
*niterated = 1;
}
return ESTOP;
}
TaskIteratorBase iter(head, this, false, high_priority);
if (iter) {
_execute_func(_meta, _type_specific_function, iter);
}
// We must assign |niterated| with num_iterated even if we couldn't peek
// any task to execute at the beginning, in which case all the iterated
// tasks have been cancelled at this point. And we must return the
// correct num_iterated() to the caller to update the counter correctly.
if (niterated) {
*niterated = iter.num_iterated();
}
return 0;
}

这里比较重要的实现在 TaskIteratorBase 里面,这个类型实现了根据优先级来判断是否要迭代的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 传递给 execute 函数的迭代器.
class TaskIteratorBase {
DISALLOW_COPY_AND_ASSIGN(TaskIteratorBase);
friend class ExecutionQueueBase;
public:
// Returns true when the ExecutionQueue is stopped and there will never be
// more tasks and you can safely release all the related resources ever
// after.
bool is_queue_stopped() const { return _is_stopped; }
operator bool() const;
protected:
TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue,
bool is_stopped, bool high_priority)
: _cur_node(head)
, _head(head)
, _q(queue)
, _is_stopped(is_stopped)
, _high_priority(high_priority)
, _should_break(false)
, _num_iterated(0)
{ operator++(); }
~TaskIteratorBase();
void operator++();
TaskNode* cur_node() const { return _cur_node; }
private:
int num_iterated() const { return _num_iterated; }
bool should_break_for_high_priority_tasks();

TaskNode* _cur_node;
TaskNode* _head;
ExecutionQueueBase* _q; // 绑定的整个 queue 的对象.

// 注: 如果 _is_stopped == true, _high_pri 必定为 false.
bool _is_stopped;
bool _high_priority;

// 下面是内部的逻辑.
bool _should_break;
int _num_iterated;
};

这个的重点逻辑在 should_break_for_high_priority_tasks 里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 有高优任务需要插队.
inline bool TaskIteratorBase::should_break_for_high_priority_tasks() {
if (!_high_priority &&
_q->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
_should_break = true;
return true;
}
return false;
}

void TaskIteratorBase::operator++() {
if (!(*this)) {
return;
}
// 已经 iter 的任务不用再处理.
if (_cur_node->iterated) {
_cur_node = _cur_node->next;
}
// 有高优任务需要插队的时候, 需要暂时 break 掉, 等待高优先级任务执行.
// 如果迭代中发现有高优先级任务, 那么这里不再会执行任一个 task.
if (should_break_for_high_priority_tasks()) {
return;
} // else the next high_priority_task would be delayed for at most one task

// 没有高优先级任务, 然后不是 stop.
while (_cur_node && !_cur_node->stop_task) {
// 如果优先级等同, 那么执行: 如果有高优先级的, 低优先级的不会被执行
if (_high_priority == _cur_node->high_priority) {
// 如果没有 iterated, 那么捞出来设置一下状态, 然后返回.
if (!_cur_node->iterated && _cur_node->peek_to_execute()) {
++_num_iterated;
_cur_node->iterated = true;
return;
}
_num_iterated += !_cur_node->iterated;
_cur_node->iterated = true;
}
// 取下一个可执行的任务。
_cur_node = _cur_node->next;
}
return;
}

组下一个 batch 的逻辑

组下一个 batch 的逻辑在 _more_tasks 中,但我们必须联动上一节,假设某个 Batch:

  1. 执行低优任务,有五个任务,执行了三个发现有高优任务插队了
  2. 执行高优任务,有10个任务,但是只有一个高优任务

这个时候,组 batch 的时候,可能自己手头上的 batch 还没执行完,用户又提交了,这就涉及到两个 batch 缝合,我们先看调用 _more_tasks 的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

// Release TaskNode until uniterated task or last task
//
// 释放掉现在所有的执行过的内容, 不过这里只是连续释放, 直到有 uniterated 的对象.
while (head->next != NULL && head->iterated) {
TaskNode* saved_head = head;
head = head->next;
m->return_task_node(saved_head);
}
// 到这个 batch 的尾部.
if (cur_tail == NULL) {
for (cur_tail = head; cur_tail->next != NULL;
cur_tail = cur_tail->next) {}
}
// break when no more tasks and head has been executed
//
// 把加进来的和已有的组成一个新的 Batch, cur_tail 和 &cur_tail 这个有点让人困惑, 因为队列要满足 FIFO 条件,
// 投递进来的原本是 T3->T2->T1. 在第一个 batch 里面, T1 设置到了 head = NULL 到 head = T1, 所以 T1 是
// leader, 这个时候, cur_tail == T1. old_head == T1, new_tail = &T1(即 head).
// 再次执行完的时候, 这个队列会变成 T3->T2, 但是执行顺序是 T2->T3, 所以, old_head 对应 T2, new_tail 也应该传入
// T2.
// 本来这里没有问题, 但是可能 has_uniterated == true, 需要缝合两个队列.
if (!m->_more_tasks(cur_tail, &cur_tail, !head->iterated)) {
CHECK_EQ(cur_tail, head);
CHECK(head->iterated);
m->return_task_node(head);
break;
}

这里根据任务有没有执行完,设置了不同的 cur_tail 来处理。然后我们看 _more_tasks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
inline bool ExecutionQueueBase::_more_tasks(
TaskNode* old_head, TaskNode** new_tail,
bool has_uniterated) {

CHECK(old_head->next == NULL);
// Try to set _head to NULL to mark that the execution is done.
TaskNode* new_head = old_head;
TaskNode* desired = NULL;
bool return_when_no_more = false;
if (has_uniterated) { // desired 设置到 old_head, 方便组 batch.
desired = old_head;
return_when_no_more = true;
}
// _head 和 old_head 如果相等, 就把 desired 设置给 _head. 只有没有新任务来的时候, 会相等.
// desired 在迭代完全的时候, 会是 NULL, 这个时候队列也设置为空; 否则设置为这次迭代的尾部, 允许别人再 append.
// 失败的时候, 新的 _head 会被加载到 `new_head` 中.
if (_head.compare_exchange_strong(
new_head, desired, butil::memory_order_acquire)) {
// No one added new tasks.
// 没有新任务, 这里返回本队列是否迭代完成.
return return_when_no_more;
}
CHECK_NE(new_head, old_head);
// Above acquire fence pairs release fence of exchange in Write() to make
// sure that we see all fields of requests set.

// Someone added new requests.
// Reverse the list until old_head.
TaskNode* tail = NULL;
if (new_tail) {
*new_tail = new_head;
}
TaskNode* p = new_head;
do {
// TODO(mwish): 这个地方和 enqueue 有并发, 没有问题吗.
while (p->next == TaskNode::UNCONNECTED) {
// TODO(gejun): elaborate this
sched_yield();
}
TaskNode* const saved_next = p->next;
p->next = tail;
tail = p;
p = saved_next;
CHECK(p != NULL);
} while (p != old_head);

// Link old list with new list.
old_head->next = tail;
return true;
}

这里会把两个 queue 拼成一个,大概逻辑如下:

1
2
3
4
5
6
7
8
9
正在执行的 Queue: T1(done)->T2(done)->T3(undone, head)
head_ 有关的结构: (head)T6->T5->T4->T3->T2->T1

回收空间,正在执行的 Queue: T3(undone, head)
head_ 有关的结构: (head)T6->T5->T4->T3->T2->T1

拼接后, queue 有关的结构: T3->T4->T5->T6
head_ 有关的结构: (head)T6->T5->T4->T3

这里的执行流程如下

  1. 对已经执行的 Task 的资源回收,调用 ExecutionQueueBase::return_task_node,在 _more_tasks 之前回首掉
  2. 会组好 Batch,然后等待下一次执行。这里会如上图一样拼接
  3. 如果没有更多的任务,可能会回收 heads

Graceful shutdown

对于一个关闭的 queue,首先,投递的任务会失败:

1
2
3
4
5
6
7
8
// (在 class ExecutionQueue<T> 里面)
int execute(typename butil::add_const_reference<T>::type task,
const TaskOptions* options, TaskHandle* handle) {
if (stopped()) {
return EINVAL;
}
...
}

其次,这里有一套引用计数机制,来保证持有的地方不会失效:

1
2

ExecutionQueueBase::scoped_ptr_t ExecutionQueueBase::address(uint64_t id);

当引用计数差不多得了的时候,这里会投递一些关闭的 flag,调用 _on_recycle:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

void ExecutionQueueBase::_on_recycle() {
// Push a closed tasks
while (true) {
TaskNode* node = butil::get_object<TaskNode>();
if (BAIDU_LIKELY(node != NULL)) {
get_execq_vars()->running_task_count << 1;
node->stop_task = true;
node->high_priority = false;
node->in_place = false;
start_execute(node);
break;
}
CHECK(false) << "Fail to create task_node_t, " << berror();
::bthread_usleep(1000);
}
}