intdemo_execute(void* meta, TaskIterator<T>& iter){ if (iter.is_queue_stopped()) { // destroy meta and related resources return0; } for (; iter; ++iter) { // user_fn(*iter) // or user_fn(iter->a_member_of_T) } return0; }
structExecutionQueueOptions { ExecutionQueueOptions(); // Attribute of the bthread which execute runs on // default: BTHREAD_ATTR_NORMAL bthread_attr_t bthread_attr;
// Executor that tasks run on. bthread will be used when executor = NULL. // Note that TaskOptions.in_place_if_possible = false will not work, if implementation of // Executor is in-place(synchronous). Executor * executor; };
// Start a ExecutionQueue. If |options| is NULL, the queue will be created with // the default options. // Returns 0 on success, errno otherwise // NOTE: type |T| can be non-POD but must be copy-constructible template <typename T> intexecution_queue_start( ExecutionQueueId<T>* id, const ExecutionQueueOptions* options, int (*execute)(void* meta, TaskIterator<T>& iter), void* meta);
// Thread-safe and Wait-free. // Execute a task with options. e.g // bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT) // If |options| is NULL, we will use default options (normal task) // If |handle| is not NULL, we will assign it with the handler of this task. template <typename T> intexecution_queue_execute(ExecutionQueueId<T> id, typename butil::add_const_reference<T>::type task, const TaskOptions* options); template <typename T> intexecution_queue_execute(ExecutionQueueId<T> id, typename butil::add_const_reference<T>::type task, const TaskOptions* options, TaskHandle* handle);
// [Thread safe and ABA free] Cancel the corresponding task. // Returns: // -1: The task was executed or h is an invalid handle // 0: Success // 1: The task is executing intexecution_queue_cancel(const TaskHandle& h);
// Executor would execute high-priority tasks in the FIFO order but before // all pending normal-priority tasks. // NOTE: We don't guarantee any kind of real-time as there might be tasks still // in process which are uninterruptible. // // Default: false bool high_priority;
// If |in_place_if_possible| is true, execution_queue_execute would call // execute immediately instead of starting a bthread if possible // // Note: Running callbacks in place might cause the dead lock issue, you // should be very careful turning this flag on. // // Default: false bool in_place_if_possible; };
// _high_priority_tasks 是表示正在执行的/等待的任务是否有高优先级任务. if (node->high_priority) { // Add _high_priority_tasks before pushing this task into queue to // make sure that _execute_tasks sees the newest number when this // task is in the queue. Although there might be some useless for // loops in _execute_tasks if this thread is scheduled out at this // point, we think it's just fine. _high_priority_tasks.fetch_add(1, butil::memory_order_relaxed); }
// 拿到上一次的 prev_head, 这个时候, 竞选要求只有 prev_head 是 nullptr 的时候, // 才能成为 group 的 leader. // 这里如果没有成为 group 的 leader, 会在这里堆积.
// Group 的 Leader 有权限执行任务. // // Get the right to execute the task, start a bthread to avoid deadlock // or stack overflow // // 因为是 leader, 它的 next 应该是 null. node->next = NULL; node->q = this;
ExecutionQueueVars* const vars = get_execq_vars(); vars->execq_active_count << 1; if (node->in_place) { // 如果是 in-place, 那么在本线程(bthread) 执行. int niterated = 0; _execute(node, node->high_priority, &niterated); TaskNode* tmp = node; // return if no more // 如果是高优任务, 那么处理一下 if (node->high_priority) { _high_priority_tasks.fetch_sub(niterated, butil::memory_order_relaxed); } // 如果有 more_tasks, 切到别的线程执行这个 group. if (!_more_tasks(tmp, &tmp, !node->iterated)) { vars->execq_active_count << -1; return_task_node(node); return; } }
// 根据 executor 来执行, 这个时候执行时候的 `node` 是队首. if (nullptr == _options.executor) { // 在后台线程中, 使用 _execute_tasks 来执行. bthread_t tid; // We start the execution thread in background instead of foreground as // we can't determine whether the code after execute() is urgent (like // unlock a pthread_mutex_t) in which case implicit context switch may // cause undefined behavior (e.g. deadlock) // // 在 background 中执行 (TODO: 这个地方会丢进队列吗?) if (bthread_start_background(&tid, &_options.bthread_attr, _execute_tasks, node) != 0) { PLOG(FATAL) << "Fail to start bthread"; _execute_tasks(node); } } else { // submit 异步执行. if (_options.executor->submit(_execute_tasks, node) != 0) { PLOG(FATAL) << "Fail to submit task"; _execute_tasks(node); } } }
// 具体执行的逻辑, start_execute -> execute_task -> execute. int ExecutionQueueBase::_execute(TaskNode* head, bool high_priority, int* niterated) { // 如果是一个 stop task, 那么具体执行 if (head != NULL && head->stop_task) { CHECK(head->next == NULL); head->iterated = true; // 设置自身为 executed. head->status = EXECUTED; // 如果是 stop, 那么这里 `high_priority == false`. TaskIteratorBase iter(NULL, this, true, false); _execute_func(_meta, _type_specific_function, iter); if (niterated) { *niterated = 1; } return ESTOP; } TaskIteratorBase iter(head, this, false, high_priority); if (iter) { _execute_func(_meta, _type_specific_function, iter); } // We must assign |niterated| with num_iterated even if we couldn't peek // any task to execute at the beginning, in which case all the iterated // tasks have been cancelled at this point. And we must return the // correct num_iterated() to the caller to update the counter correctly. if (niterated) { *niterated = iter.num_iterated(); } return0; }
void TaskIteratorBase::operator++() { if (!(*this)) { return; } // 已经 iter 的任务不用再处理. if (_cur_node->iterated) { _cur_node = _cur_node->next; } // 有高优任务需要插队的时候, 需要暂时 break 掉, 等待高优先级任务执行. // 如果迭代中发现有高优先级任务, 那么这里不再会执行任一个 task. if (should_break_for_high_priority_tasks()) { return; } // else the next high_priority_task would be delayed for at most one task