pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER; // Referenced in rpc, needs to be extern. // Notice that we can't declare the variable as atomic<TaskControl*> which // are not constructed before main(). TaskControl* g_task_control = NULL;
// 尝试 push 到 rq 中, 如果队列满了, 说明创建了太多任务, 会需要 sleep 一下. inlinevoidTaskGroup::push_rq(bthread_t tid){ while (!_rq.push(tid)) { // Created too many bthreads: a promising approach is to insert the // task into another TaskGroup, but we don't use it because: // * There're already many bthreads to run, inserting the bthread // into other TaskGroup does not help. // * Insertions into other TaskGroups perform worse when all workers // are busy at creating bthreads (proved by test_input_messenger in // brpc) flush_nosignal_tasks(); LOG_EVERY_SECOND(ERROR) << "_rq is full, capacity=" << _rq.capacity(); // TODO(gejun): May cause deadlock when all workers are spinning here. // A better solution is to pop and run existing bthreads, however which // make set_remained()-callbacks do context switches and need extensive // reviews on related code. ::usleep(1000); } }
voidTaskControl::signal_task(int num_task){ if (num_task <= 0) { return; } // TODO(gejun): Current algorithm does not guarantee enough threads will // be created to match caller's requests. But in another side, there's also // many useless signalings according to current impl. Capping the concurrency // is a good balance between performance and timeliness of scheduling. if (num_task > 2) { num_task = 2; } // 选中一个 ParkingLot. // 这里要寻找 1-2 个 worker 来唤醒. int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; num_task -= _pl[start_index].signal(1);
if (num_task > 0) { for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { if (++start_index >= PARKING_LOT_NUM) { start_index = 0; } num_task -= _pl[start_index].signal(1); } } // 还有任务(感觉概率很小), 可能需要动态调度 worker. if (num_task > 0 && FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) { // TODO: Reduce this lock BAIDU_SCOPED_LOCK(g_task_control_mutex); if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) { add_workers(1); } } }
boolTaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset){ // 1: Acquiring fence is paired with releasing fence in _add_group to // avoid accessing uninitialized slot of _groups. constsize_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/); if (0 == ngroup) { returnfalse; }
// NOTE: Don't return inside `for' iteration since we need to update |seed| bool stolen = false; size_t s = *seed; for (size_t i = 0; i < ngroup; ++i, s += offset) { TaskGroup* g = _groups[s % ngroup]; // g is possibly NULL because of concurrent _destroy_group if (g) { if (g->_rq.steal(tid)) { stolen = true; break; } if (g->_remote_rq.pop(tid)) { stolen = true; break; } } } *seed = s; return stolen; }
sched_to
这是最重要的函数了,涉及了具体 Fiber 的运行、切栈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
inlinevoidTaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid){ TaskMeta* next_meta = address_meta(next_tid); if (next_meta->stack == NULL) { ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner); if (stk) { next_meta->set_stack(stk); } else { // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory, // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD. // This basically means that if we can't allocate stack, run // the task in pthread directly. next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD; next_meta->set_stack((*pg)->_main_stack); } } // Update now_ns only when wait_task did yield. sched_to(pg, next_meta); }
structStackStorage { int stacksize; int guardsize; // Assume stack grows upwards. // http://www.boost.org/doc/libs/1_55_0/libs/context/doc/html/context/stack.html void* bottom; unsigned valgrind_stack_id;
// Clears all members. voidzeroize(){ stacksize = 0; guardsize = 0; bottom = NULL; valgrind_stack_id = 0; } }; // Allocate a piece of stack. intallocate_stack_storage(StackStorage* s, int stacksize, int guardsize); // Deallocate a piece of stack. Parameters MUST be returned or set by the // corresponding allocate_stack_storage() otherwise behavior is undefined. voiddeallocate_stack_storage(StackStorage* s);
do { // A task can be stopped before it gets running, in which case // we may skip user function, but that may confuse user: // Most tasks have variables to remember running result of the task, // which is often initialized to values indicating success. If an // user function is never called, the variables will be unchanged // however they'd better reflect failures because the task is stopped // abnormally.
// Meta and identifier of the task is persistent in this run. TaskMeta* const m = g->_cur_meta;
if (FLAGS_show_bthread_creation_in_vars) { // NOTE: the thread triggering exposure of pending time may spend // considerable time because a single bvar::LatencyRecorder // contains many bvar. g->_control->exposed_pending_time() << (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L; }
// Not catch exceptions except ExitException which is for implementing // bthread_exit(). User code is intended to crash when an exception is // not caught explicitly. This is consistent with other threading // libraries.
// Logging must be done before returning the keytable, since the logging lib // use bthread local storage internally, or will cause memory leak. // FIXME: the time from quiting fn to here is not counted into cputime if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) { LOG(INFO) << "Finished bthread " << m->tid << ", cputime=" << m->stat.cputime_ns / 1000000.0 << "ms"; }
// Clean tls variables, must be done before changing version_butex // otherwise another thread just joined this thread may not see side // effects of destructing tls variables. KeyTable* kt = tls_bls.keytable; if (kt != NULL) { return_keytable(m->attr.keytable_pool, kt); // After deletion: tls may be set during deletion. tls_bls.keytable = NULL; m->local_storage.keytable = NULL; // optional }
// Increase the version and wake up all joiners, if resulting version // is 0, change it to 1 to make bthread_t never be 0. Any access // or join to the bthread after changing version will be rejected. // The spinlock is for visibility of TaskGroup::get_attr. { BAIDU_SCOPED_LOCK(m->version_lock); if (0 == ++*m->version_butex) { ++*m->version_butex; } } butex_wake_except(m->version_butex, 0);
// Logging must be done after switching the local storage, since the logging lib // use bthread local storage internally, or will cause memory leak. if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) || (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) { LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> " << next_meta->tid; }
if (cur_meta->stack != NULL) { if (next_meta->stack != cur_meta->stack) { jump_stack(cur_meta->stack, next_meta->stack); // probably went to another group, need to assign g again. g = tls_task_group; } #ifndef NDEBUG else { // else pthread_task is switching to another pthread_task, sc // can only equal when they're both _main_stack CHECK(cur_meta->stack == g->_main_stack); } #endif } // else because of ending_sched(including pthread_task->pthread_task) } else { LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!"; }
while (g->_last_context_remained) { RemainedFn fn = g->_last_context_remained; g->_last_context_remained = NULL; fn(g->_last_context_remained_arg); g = tls_task_group; }
// 一个调度完成了, 尝试调度运行到下一组 task. // 这里通过 TaskGroup::task_runner 来包装了一层 Task, 它可能又在 task_runner 被调用. voidTaskGroup::ending_sched(TaskGroup** pg){ TaskGroup* g = *pg; bthread_t next_tid = 0; // Find next task to run, if none, switch to idle thread of the group.
// BTHREAD_FAIR_WSQ 会有帮助吗? #ifndef BTHREAD_FAIR_WSQ // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9% // to 2.9% constbool popped = g->_rq.pop(&next_tid); #else constbool popped = g->_rq.steal(&next_tid); #endif
// 简单控制一下, 如果 rq 没有, remote 也没有, 才会设置成 main_tid. // 这就要等待唤醒了. if (!popped && !g->steal_task(&next_tid)) { // Jump to main task if there's no task to run. next_tid = g->_main_tid; }
// 拿到现有的 meta, 可能之后要被回收了. TaskMeta* const cur_meta = g->_cur_meta; // 定下下一个需要跳转的栈 TaskMeta* next_meta = address_meta(next_tid); // 需要创建栈. if (next_meta->stack == NULL) { if (next_meta->stack_type() == cur_meta->stack_type()) { // also works with pthread_task scheduling to pthread_task, the // transferred stack is just _main_stack. next_meta->set_stack(cur_meta->release_stack()); } else { // 创建 Stack, ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner); if (stk) { next_meta->set_stack(stk); } else { // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory, // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD. // This basically means that if we can't allocate stack, run // the task in pthread directly. next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD; next_meta->set_stack(g->_main_stack); } } } sched_to(pg, next_meta); }
有一个问题是,fcontext 的 ra 是 _exit,这里不会出现碰到结尾直接退出的问题吗?答案是 ending_sched 的时候,直接把你这个栈给准备灭了,等下一次调度到别的地方的时候,直接把你这个整个栈换了。这样就不会走到 _exit 啦~