Skip to main content

kthread Scheduling and Execution Flow

Overall Flow

TaskGroup is responsible for scheduling and executing kthreads. Each TaskGroup corresponds to one pthread and contains two execution queues: _rq and _remote_rq, which store pending kthreads. Kthreads created by other kthreads are placed in _rq, while kthreads created by pthreads are placed in _remote_rq. TaskControl is a global singleton that manages multiple TaskGroup instances internally.

Core Interfaces

TaskControl

TaskControl is a singleton. Below is its initialization process—the core logic is to create _concurrency worker (kthread_worker) threads, with each worker executing the worker_thread function:

int TaskControl::init(int concurrency) {
_concurrency = concurrency;
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
...
}
...
}

The logic of worker_thread is to create a TaskGroup g via create_group, add it to TaskControl, and set tls_task_group (a Thread-Local Storage, TLS variable) to g. Only worker threads have a non-null tls_task_group. It then executes TaskGroup's run_main_task function:

void* TaskControl::worker_thread(void* arg) {
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group();
...
tls_task_group = g;
c->_nworkers << 1;
g->run_main_task();
...
}
TaskGroup* TaskControl::create_group() {
...
g->init(FLAGS_task_group_runqueue_capacity);
...
}

TaskGroup

Each TaskGroup maps to one pthread. Its initialization function is shown below: it creates rq and remote_rq (both queues for pending kthreads), then initializes main_stack and main_tid. main_tid represents the kthread ID of the main execution flow, and the roles of main_stack and main_tid are explained in detail later. TaskMeta stores metadata for a kthread (e.g., execution function, arguments, local storage). Here, cur_meta is set to the TaskMeta corresponding to main_tid.

int TaskGroup::init(size_t runqueue_capacity) {
_rq.init(runqueue_capacity);
_remote_rq.init(runqueue_capacity / 2);
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
...
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
...
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = NULL;
m->arg = NULL;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = butil::cpuwide_time_ns();
m->stat = EMPTY_STAT;
m->attr = BTHREAD_ATTR_TASKGROUP;
m->tid = make_tid(*m->version_butex, slot);
m->set_stack(stk);

_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
_last_run_ns = butil::cpuwide_time_ns();
return 0;
}

Each worker runs in an infinite while loop: wait_task returns the tid of a runnable kthread if available, otherwise the worker blocks. The logic of wait_task is to first pop from the current TaskGroup's _remote_rq; if empty, it steals (pops) from the _rq and _remote_rq of other TaskGroup instances.

void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

TaskGroup* dummy = this;
kthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
}
}

Once a runnable tid is obtained, sched_to is called. It first retrieves the TaskMeta corresponding to the tid:

  • If a stack has already been allocated for this TaskMeta, sched_to(pg, next_meta) is called. The core logic of this function is to jump to next_meta via jump_stack(cur_meta->stack, next_meta->stack).
  • Otherwise, a new stack is allocated via get_stack, and the entry point of the stack is set to the task_runner function.
inline void TaskGroup::sched_to(TaskGroup** pg, kthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}

The core logic of task_runner is as follows:

  1. Execute the remain function (preliminary work a kthread must complete before running its core logic—explained later).
  2. Execute the function stored in the TaskMeta. Since the kthread may be scheduled to another worker during execution, the TaskGroup may change—so g is reinitialized after execution.
  3. Finally, call ending_sched.
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;

if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
}

do {

TaskMeta* const m = g->_cur_meta;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}

// Group is probably changed
g = tls_task_group;
...
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);

} while (g->_cur_meta->tid != g->_main_tid);

// Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
// tasks to run, quit for more tasks.
}

ending_sched attempts to fetch a runnable kthread:

  • If none is found, the next execution target is the TaskMeta corresponding to main_tid.
  • It then jumps to next_meta via sched_to(next_meta) (described above).
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
kthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
// to 2.9%
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}

TaskMeta* const cur_meta = g->_cur_meta;
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
if (next_meta->stack_type() == cur_meta->stack_type()) {
// also works with pthread_task scheduling to pthread_task, the
// transfered stack is just _main_stack.
next_meta->set_stack(cur_meta->release_stack());
} else {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack(g->_main_stack);
}
}
}
sched_to(pg, next_meta);
}

Main tid

Now we explain main_tid/main_stack (mentioned earlier): A TaskGroup is bound to a pthread. When executing a kthread, the pthread runs on the kthread's stack; at all other times, it runs on the pthread's native stack. brpc does not allocate a new stack for the pthread—it only records the address of the pthread's native stack. main_stack refers to this pthread stack, and main_tid uniquely identifies the pthread.

Let’s examine how this is implemented:

int TaskGroup::init(size_t runqueue_capacity) {
...
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
...
}

In TaskGroup::init above, we see ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);:

  • STACK_TYPE_MAIN is the type of main_stack.
  • get_stack calls StackFactory::get_stack. StackFactory is a template class that allocates stack space by default, but it is specialized for STACK_TYPE_MAIN: no stack space is allocated—only a ContextualStack object is returned.
template <> struct StackFactory<MainStackClass> {
static ContextualStack* get_stack(void (*)(intptr_t)) {
ContextualStack* s = new (std::nothrow) ContextualStack;
if (NULL == s) {
return NULL;
}
s->context = NULL;
s->stacktype = STACK_TYPE_MAIN;
s->storage.zeroize();
return s;
}

static void return_stack(ContextualStack* s) {
delete s;
}
};

During the switch to kthread execution, jump_stack(cur_meta->stack, next_meta->stack) is called:

inline void jump_stack(ContextualStack* from, ContextualStack* to) {
kthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}
  • cur_meta is the TaskMeta corresponding to main_tid (the pthread's metadata).
  • next_meta is the metadata of the kthread to be executed.

As explained in previous documentation, kthread_jump_fcontext pushes the current register state to the current stack (pthread stack) and assigns esp (stack pointer) to rdi (i.e., from->context). Thus, main_tid's stack points to the pthread's native stack.

Key Interfaces

Next, we examine the interfaces provided by kthread, using kthread_start_urgent and kthread_start_background as examples:

  • kthread_start_urgent: Treats the new kthread as "high priority".
  • kthread_start_background: Treats the new kthread as "low priority".

The meaning of "priority" is explained below.

kthread_start_urgent
int kthread_start_urgent(kthread_t* __restrict tid,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
kthread::TaskGroup* g = kthread::tls_task_group;
if (g) {
// start from worker
return kthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
}
return kthread::start_from_non_worker(tid, attr, fn, arg);
}

Recall that tls_task_group is a TLS variable—ordinary pthreads have a null tls_task_group. Let’s first analyze the flow for ordinary pthreads: they call start_from_non_worker.

BUTIL_FORCE_INLINE int
start_from_non_worker(kthread_t* __restrict tid,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
// Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
// 1. NOSIGNAL is often for creating many kthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. kthread_flush() needs to know which TaskGroup to flush.
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background<true>(tid, attr, fn, arg);
}
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}

start_from_non_worker attempts to retrieve the TaskControl singleton (creates it if missing, initializing a set of TaskGroups). It then selects a TaskGroup and calls start_background<true>.

template <bool REMOTE>
int TaskGroup::start_background(kthread_t* __restrict th,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
...
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
...
m->fn = fn;
m->arg = arg;
...
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
  • REMOTE: A template flag indicating whether the kthread is created by an ordinary pthread (true) or a kthread worker (false).
  • Core logic: Create a TaskMeta for the new kthread, then call ready_to_run_remote to add the tid to the TaskGroup's remote_rq.

Now analyze the flow when kthread_start_urgent is called by a kthread worker (i.e., creating a kthread from within a kthread):

  • tls_task_group is non-null, so start_foreground is called.
  • start_foreground creates a TaskMeta and immediately switches execution to the new kthread (hence "high priority").
int TaskGroup::start_foreground(TaskGroup** pg,
kthread_t* __restrict th,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
...
TaskGroup* g = *pg;
g->_control->_nkthreads << 1;
if (g->is_current_pthread_task()) {
// never create foreground task in pthread.
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
return 0;
}

At the end of start_foreground:

  • The remained logic of the current TaskGroup is set. As mentioned earlier, task_runner executes the remained logic before running the kthread's core function.
  • start_foreground preempts the current kthread's execution: the current kthread is pushed back to rq via the remained logic to wait for re-execution.
kthread_start_background
  • For ordinary pthreads: The flow is identical to kthread_start_urgent (calls start_from_non_workerstart_background<true>).
  • For kthread workers: Calls start_background<false>. After creating the TaskMeta, it calls ready_to_run to push the kthread to rq (instead of immediate execution—hence "low priority").