跳到主要内容

kthread 调度与执行流程

总体流程

TaskGroup 负责调度和执行 kthread。每个 TaskGroup 对应一个 pthread,并包含两个执行队列:_rq_remote_rq,用于存放待执行的 kthread。由其他 kthread 创建的 kthread 会放入 _rq,而由 pthread 创建的 kthread 会放入 _remote_rqTaskControl 是全局单例,内部管理多个 TaskGroup 实例。

import kthread from '@site/static/img/kthread/kthread.png';

<img src={kthread}/>

核心接口

TaskControl

TaskControl 是单例。初始化过程如下,其核心逻辑是创建 _concurrency 个 worker(kthread_worker)线程,每个 worker 执行 worker_thread 函数:

int TaskControl::init(int concurrency) {
_concurrency = concurrency;
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
...
}
...
}

worker_thread 的逻辑是通过 create_group 创建一个 TaskGroup g,将其加入 TaskControl,并将 tls_task_group(线程局部变量 TLS)设置为 g。只有 worker 线程的 tls_task_group 非空。随后执行 TaskGrouprun_main_task 函数:

void* TaskControl::worker_thread(void* arg) {
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group();
...
tls_task_group = g;
c->_nworkers << 1;
g->run_main_task();
...
}
TaskGroup* TaskControl::create_group() {
...
g->init(FLAGS_task_group_runqueue_capacity);
...
}

TaskGroup

每个 TaskGroup 对应一个 pthread。初始化函数如下:它创建了 _rq_remote_rq(用于存放待执行 kthread 的队列),然后初始化 main_stackmain_tidmain_tid 表示主执行流的 kthread ID,main_stackmain_tid 的具体作用在后面详细说明。TaskMeta 用于存储 kthread 的元信息(例如执行函数、参数、本地存储等)。这里的 _cur_meta 设置为 main_tid 对应的 TaskMeta

int TaskGroup::init(size_t runqueue_capacity) {
_rq.init(runqueue_capacity);
_remote_rq.init(runqueue_capacity / 2);
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
...
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
...
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = NULL;
m->arg = NULL;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = butil::cpuwide_time_ns();
m->stat = EMPTY_STAT;
m->attr = BTHREAD_ATTR_TASKGROUP;
m->tid = make_tid(*m->version_butex, slot);
m->set_stack(stk);

_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
_last_run_ns = butil::cpuwide_time_ns();
return 0;
}

每个 worker 都运行在一个无限 while 循环中:wait_task 返回可运行 kthread 的 tid,如果没有可运行的 kthread,则 worker 阻塞。wait_task 的逻辑是先从当前 TaskGroup_remote_rq 弹出;如果为空,则从其他 TaskGroup_rq_remote_rq 中窃取任务。

void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

TaskGroup* dummy = this;
kthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
}
}

一旦获得可运行的 tid,就会调用 sched_to。它首先获取对应 tid 的 TaskMeta

  • 如果已经分配了栈,则调用 sched_to(pg, next_meta),其核心逻辑是通过 jump_stack(cur_meta->stack, next_meta->stack) 跳转到 next_meta
  • 否则,分配一个新的栈,通过 get_stack 获取,栈的入口函数设置为 task_runner
inline void TaskGroup::sched_to(TaskGroup** pg, kthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// If stack allocation fails, run the task directly in pthread.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}

task_runner 的核心逻辑如下:

  1. 执行 remained 函数(kthread 在执行核心逻辑前必须完成的准备工作,稍后解释)。
  2. 执行 TaskMeta 中存储的函数。由于 kthread 可能在执行过程中被调度到其他 worker,执行完后需要重新获取 TaskGroup
  3. 最后调用 ending_sched
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;

if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
}

do {

TaskMeta* const m = g->_cur_meta;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}

// Group is probably changed
g = tls_task_group;
...
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);

} while (g->_cur_meta->tid != g->_main_tid);

// Called from a pthread with no BTHREAD_STACKTYPE_PTHREAD tasks left
// Quit to wait for more tasks.
}

ending_sched 尝试获取一个可运行的 kthread:

  • 如果没有找到,下一次执行目标是对应 main_tidTaskMeta
  • 然后通过 sched_to(next_meta) 跳转(如上所述)。
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
kthread_t next_tid = 0;
#ifndef BTHREAD_FAIR_WSQ
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
next_tid = g->_main_tid;
}

TaskMeta* const cur_meta = g->_cur_meta;
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
if (next_meta->stack_type() == cur_meta->stack_type()) {
next_meta->set_stack(cur_meta->release_stack());
} else {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack(g->_main_stack);
}
}
}
sched_to(pg, next_meta);
}

Main tid

解释 main_tid / main_stack

  • TaskGroup 绑定到一个 pthread。
  • 当执行 kthread 时,pthread 在 kthread 的栈上运行;其他时间在 pthread 的原生栈上运行。
  • brpc 不为 pthread 分配新栈,只记录 pthread 原生栈地址。
  • main_stack 指向 pthread 的栈,main_tid 唯一标识该 pthread。
int TaskGroup::init(size_t runqueue_capacity) {
...
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
...
}

STACK_TYPE_MAIN 表示 main_stack 类型。get_stack 调用 StackFactory::get_stack。默认情况下,StackFactory 分配栈空间,但对 STACK_TYPE_MAIN 做了特殊化:不分配栈,只返回一个 ContextualStack 对象。

template <> struct StackFactory<MainStackClass> {
static ContextualStack* get_stack(void (*)(intptr_t)) {
ContextualStack* s = new (std::nothrow) ContextualStack;
if (NULL == s) {
return NULL;
}
s->context = NULL;
s->stacktype = STACK_TYPE_MAIN;
s->storage.zeroize();
return s;
}

static void return_stack(ContextualStack* s) {
delete s;
}
};

切换到 kthread 执行时,调用 jump_stack(cur_meta->stack, next_meta->stack)

inline void jump_stack(ContextualStack* from, ContextualStack* to) {
kthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}
  • cur_metamain_tid 对应的 TaskMeta(pthread 的元信息)。
  • next_meta 是待执行 kthread 的元信息。 kthread_jump_fcontext 将当前寄存器状态推入 当前栈(pthread 栈) 并更新 esp,因此 main_tid 的栈指向 pthread 原生栈。

关键接口

kthread_start_urgentkthread_start_background 为例:

  • kthread_start_urgent:新 kthread 为高优先级。
  • kthread_start_background:新 kthread 为低优先级。
kthread_start_urgent
int kthread_start_urgent(kthread_t* __restrict tid,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
kthread::TaskGroup* g = kthread::tls_task_group;
if (g) {
return kthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
}
return kthread::start_from_non_worker(tid, attr, fn, arg);
}
  • tls_task_group 是 TLS 变量,普通 pthread 的值为空。
  • 普通 pthread 调用 start_from_non_worker 流程如下:
BUTIL_FORCE_INLINE int
start_from_non_worker(kthread_t* __restrict tid,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background<true>(tid, attr, fn, arg);
}
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}
  • 获取或创建 TaskControl 单例,选择一个 TaskGroup,调用 start_background<true>
template <bool REMOTE>
int TaskGroup::start_background(kthread_t* __restrict th,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
...
TaskMeta* m = butil::get_resource(&slot);
m->fn = fn;
m->arg = arg;
...
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
  • REMOTE:模板参数,表示 kthread 是否由普通 pthread 创建。

  • 核心逻辑:创建 TaskMeta,然后调用 ready_to_run_remote 将 tid 加入 remote_rq

  • 当 kthread worker 调用 kthread_start_urgent 时:

  • tls_task_group 非空,调用 start_foreground

  • 创建 TaskMeta立即切换执行到新 kthread(高优先级)。

int TaskGroup::start_foreground(TaskGroup** pg,
kthread_t* __restrict th,
const kthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskGroup* g = *pg;
g->_control->_nkthreads << 1;
if (g->is_current_pthread_task()) {
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
RemainedFn fn = NULL;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = { g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL) };
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
return 0;
}
  • remained 逻辑会在 task_runner 中先执行。
  • start_foreground 会抢占当前 kthread,将当前 kthread 推回 _rq 等待下次执行。
kthread_start_background
  • 普通 pthread:流程与 kthread_start_urgent 相同(调用 start_from_non_workerstart_background<true>)。
  • kthread worker:调用 start_background<false>,将 kthread 加入 _rq(低优先级,非立即执行)。