ExecutionQueue 使用总结
ExecutionQueue 类似 Kylin 的 ExecMan,实现了异步串行任务执行能力。它最初用于 RPC 多线程写同一个文件描述符,后来在 kthread 中集成(r31345)。核心特性:
- 异步有序执行:任务在独立线程中按提交顺序执行。
- 多生产者支持:多个线程可同时向同一个队列提交任务。
- 任务取消:支持已提交任务的取消。
- 队列终止:可优雅停止队列。
- 高优先级抢占:可将高优先级任务插入到普通任务前执行。
核心区别于 ExecMan
| 特性 | ExecutionQueue | ExecMan |
|---|---|---|
| 提交接口 | 无锁等待(wait-free) | 依赖锁 |
| 批量处理 | 支持批量执行,提高数据局部性 | 每次处理不同 AsyncClient,缓存抖动严重 |
| 线程绑定 | 不绑定固定线程,可独立调度多个队列 | 任务映射到固定线程,跨队列调度受限 |
| kthread 兼容性 | 可安全使用 kthread 原语,不阻塞 pthread | 避免阻塞原语 |
背景:ExecutionQueue 本质上是基于 消息传递(Message Passing) 的资源管理机制,将业务逻辑拆成独立 actor,每个 actor 处理特定资源,消息驱动资源修改,实现同步或异步处理。

ExecutionQueue vs Mutex
优势:
- 角色清晰,避免死锁等锁相关问题
- FIFO 顺序保证执行顺序
- 无线程闲置,提高资源利用率
- 高负载下批量处理效率更高
缺点:
- 逻辑分散,代码理解成本高
- 跨 ExecutionQueue 的任务拆分增加核心间开销
- 多资源原子操作复杂,需要额外调度队列
- 队列单线程处理,慢任务阻塞后续任务
- 高并发下可能积累大量待执行任务,占用内存
选择建议:
- 小临界区 & 轻度竞争 → Mutex 最优
- 严格顺序执行或高竞争 → ExecutionQueue 更合适
注意:Linux 上无竞争的 mutex lock/unlock 只需几条原子指令,开销可忽略。
使用方法
1. 实现执行函数
template <typename T>
class TaskIterator;
template <typename T>
int demo_execute(void* meta, TaskIterator<T>& iter) {
if (iter.is_queue_stopped()) {
// meta 资源释放
return 0;
}
for (; iter; ++iter) {
// 执行业务逻辑,例如 do_something(meta, *iter)
}
return 0;
}
2. 启动 ExecutionQueue
struct ExecutionQueueOptions {
bool use_pthread = false; // 使用 pthread 而非 kthread
kthread_attr_t kthread_attr = KTHREAD_ATTR_NORMAL;
Executor* executor = nullptr; // 指定 executor
};
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>& iter),
void* meta);
- 返回值为 64 位 ID,可作为弱引用定位 ExecutionQueue
meta生命周期需覆盖队列整个生命周期
3. 停止 ExecutionQueue
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);
execution_queue_stop线程安全,可多次调用meta可在iter.is_queue_stopped() == true或execution_queue_join后释放
4. 提交任务
struct TaskOptions {
bool high_priority = false; // 高优先级任务
bool in_place_if_possible = false; // 可能直接在调用线程执行
};
const TaskOptions TASK_OPTIONS_NORMAL;
const TaskOptions TASK_OPTIONS_URGENT{true};
const TaskOptions TASK_OPTIONS_INPLACE{false, true};
// 普通提交
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, T&& task);
// 提交带选项
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, T&& task,
const TaskOptions* options,
TaskHandle* handle = nullptr);
- 高优先级任务在队列内严格 FIFO
in_place_if_possible可消除线程调度开销,但可能导致死锁或递归,需要谨慎使用
5. 取消已提交任务
int execution_queue_cancel(const TaskHandle& h);
// 返回值:
// -1: 已执行或 handle 无效
// 0: 取消成功
// 1: 正在执行
返回非零不保证逻辑任务完成,需要额外业务逻辑确认任务终止。