kthread Scheduling and Execution Flow
Overall Flow
TaskGroup is responsible for scheduling and executing kthreads. Each TaskGroup corresponds to one pthread and contains two execution queues: _rq and _remote_rq, which store pending kthreads. Kthreads created by other kthreads are placed in _rq, while kthreads created by pthreads are placed in _remote_rq. TaskControl is a global singleton that manages multiple TaskGroup instances internally.
Core Interfaces
TaskControl
TaskControl is a singleton. Below is its initialization process—the core logic is to create _concurrency worker (kthread_worker) threads, with each worker executing the worker_thread function:
int TaskControl::init(int concurrency) {
_concurrency = concurrency;
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
...
}
...
}
The logic of worker_thread is to create a TaskGroup g via create_group, add it to TaskControl, and set tls_task_group (a Thread-Local Storage, TLS variable) to g. Only worker threads have a non-null tls_task_group. It then executes TaskGroup's run_main_task function:
void* TaskControl::worker_thread(void* arg) {
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group();
...
tls_task_group = g;
c->_nworkers << 1;
g->run_main_task();
...
}
TaskGroup* TaskControl::create_group() {
...
g->init(FLAGS_task_group_runqueue_capacity);
...
}
TaskGroup
Each TaskGroup maps to one pthread. Its initialization function is shown below: it creates rq and remote_rq (both queues for pending kthreads), then initializes main_stack and main_tid. main_tid represents the kthread ID of the main execution flow, and the roles of main_stack and main_tid are explained in detail later. TaskMeta stores metadata for a kthread (e.g., execution function, arguments, local storage). Here, cur_meta is set to the TaskMeta corresponding to main_tid.
int TaskGroup::init(size_t runqueue_capacity) {
_rq.init(runqueue_capacity);
_remote_rq.init(runqueue_capacity / 2);
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
...
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
...
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = NULL;
m->arg = NULL;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = butil::cpuwide_time_ns();
m->stat = EMPTY_STAT;
m->attr = BTHREAD_ATTR_TASKGROUP;
m->tid = make_tid(*m->version_butex, slot);
m->set_stack(stk);
_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
_last_run_ns = butil::cpuwide_time_ns();
return 0;
}
Each worker runs in an infinite while loop: wait_task returns the tid of a runnable kthread if available, otherwise the worker blocks. The logic of wait_task is to first pop from the current TaskGroup's _remote_rq; if empty, it steals (pops) from the _rq and _remote_rq of other TaskGroup instances.
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
TaskGroup* dummy = this;
kthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
}
}
Once a runnable tid is obtained, sched_to is called. It first retrieves the TaskMeta corresponding to the tid:
- If a stack has already been allocated for this
TaskMeta,sched_to(pg, next_meta)is called. The core logic of this function is to jump tonext_metaviajump_stack(cur_meta->stack, next_meta->stack). - Otherwise, a new stack is allocated via
get_stack, and the entry point of the stack is set to thetask_runnerfunction.
inline void TaskGroup::sched_to(TaskGroup** pg, kthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}
The core logic of task_runner is as follows:
- Execute the
remainfunction (preliminary work a kthread must complete before running its core logic—explained later). - Execute the function stored in the
TaskMeta. Since the kthread may be scheduled to another worker during execution, theTaskGroupmay change—sogis reinitialized after execution. - Finally, call
ending_sched.
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;
if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
}
do {
TaskMeta* const m = g->_cur_meta;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
// Group is probably changed
g = tls_task_group;
...
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
// Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
// tasks to run, quit for more tasks.
}
ending_sched attempts to fetch a runnable kthread:
- If none is found, the next execution target is the
TaskMetacorresponding tomain_tid. - It then jumps to
next_metaviasched_to(next_meta)(described above).
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
kthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
// to 2.9%
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
TaskMeta* const cur_meta = g->_cur_meta;
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
if (next_meta->stack_type() == cur_meta->stack_type()) {
// also works with pthread_task scheduling to pthread_task, the
// transfered stack is just _main_stack.
next_meta->set_stack(cur_meta->release_stack());
} else {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack(g->_main_stack);
}
}
}
sched_to(pg, next_meta);
}
Main tid
Now we explain main_tid/main_stack (mentioned earlier):
A TaskGroup is bound to a pthread. When executing a kthread, the pthread runs on the kthread's stack; at all other times, it runs on the pthread's native stack. brpc does not allocate a new stack for the pthread—it only records the address of the pthread's native stack. main_stack refers to this pthread stack, and main_tid uniquely identifies the pthread.
Let’s examine how this is implemented:
int TaskGroup::init(size_t runqueue_capacity) {
...
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
...
}
In TaskGroup::init above, we see ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);:
STACK_TYPE_MAINis the type ofmain_stack.get_stackcallsStackFactory::get_stack.StackFactoryis a template class that allocates stack space by default, but it is specialized forSTACK_TYPE_MAIN: no stack space is allocated—only aContextualStackobject is returned.
template <> struct StackFactory<MainStackClass> {
static ContextualStack* get_stack(void (*)(intptr_t)) {
ContextualStack* s = new (std::nothrow) ContextualStack;
if (NULL == s) {
return NULL;
}
s->context = NULL;
s->stacktype = STACK_TYPE_MAIN;
s->storage.zeroize();
return s;
}
static void return_stack(ContextualStack* s) {
delete s;
}
};
During the switch to kthread execution, jump_stack(cur_meta->stack, next_meta->stack) is called:
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
kthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}
cur_metais theTaskMetacorresponding tomain_tid(the pthread's metadata).next_metais the metadata of the kthread to be executed.
As explained in previous documentation, kthread_jump_fcontext pushes the current register state to the current stack (pthread stack) and assigns esp (stack pointer) to rdi (i.e., from->context). Thus, main_tid's stack points to the pthread's native stack.
Key Interfaces
Next, we examine the interfaces provided by kthread, using kthread_start_urgent and kthread_start_background as examples:
kthread_start_urgent: Treats the new kthread as "high priority".kthread_start_background: Treats the new kthread as "low priority".
The meaning of "priority" is explained below.
kthread_start_urgent
int kthread_start_urgent(kthread_t* __restrict tid,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
kthread::TaskGroup* g = kthread::tls_task_group;
if (g) {
// start from worker
return kthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
}
return kthread::start_from_non_worker(tid, attr, fn, arg);
}
Recall that tls_task_group is a TLS variable—ordinary pthreads have a null tls_task_group. Let’s first analyze the flow for ordinary pthreads: they call start_from_non_worker.
BUTIL_FORCE_INLINE int
start_from_non_worker(kthread_t* __restrict tid,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
// Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
// 1. NOSIGNAL is often for creating many kthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. kthread_flush() needs to know which TaskGroup to flush.
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background<true>(tid, attr, fn, arg);
}
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}
start_from_non_worker attempts to retrieve the TaskControl singleton (creates it if missing, initializing a set of TaskGroups). It then selects a TaskGroup and calls start_background<true>.
template <bool REMOTE>
int TaskGroup::start_background(kthread_t* __restrict th,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
...
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
...
m->fn = fn;
m->arg = arg;
...
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
REMOTE: A template flag indicating whether the kthread is created by an ordinary pthread (true) or a kthread worker (false).- Core logic: Create a
TaskMetafor the new kthread, then callready_to_run_remoteto add the tid to theTaskGroup'sremote_rq.
Now analyze the flow when kthread_start_urgent is called by a kthread worker (i.e., creating a kthread from within a kthread):
tls_task_groupis non-null, sostart_foregroundis called.start_foregroundcreates aTaskMetaand immediately switches execution to the new kthread (hence "high priority").
int TaskGroup::start_foreground(TaskGroup** pg,
kthread_t* __restrict th,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
...
TaskGroup* g = *pg;
g->_control->_nkthreads << 1;
if (g->is_current_pthread_task()) {
// never create foreground task in pthread.
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
return 0;
}
At the end of start_foreground:
- The
remainedlogic of the currentTaskGroupis set. As mentioned earlier,task_runnerexecutes theremainedlogic before running the kthread's core function. start_foregroundpreempts the current kthread's execution: the current kthread is pushed back torqvia theremainedlogic to wait for re-execution.
kthread_start_background
- For ordinary pthreads: The flow is identical to
kthread_start_urgent(callsstart_from_non_worker→start_background<true>). - For kthread workers: Calls
start_background<false>. After creating theTaskMeta, it callsready_to_runto push the kthread torq(instead of immediate execution—hence "low priority").