跳到主要内容

ExecutionQueue 使用总结

ExecutionQueue 类似 Kylin 的 ExecMan,实现了异步串行任务执行能力。它最初用于 RPC 多线程写同一个文件描述符,后来在 kthread 中集成(r31345)。核心特性:

  • 异步有序执行:任务在独立线程中按提交顺序执行。
  • 多生产者支持:多个线程可同时向同一个队列提交任务。
  • 任务取消:支持已提交任务的取消。
  • 队列终止:可优雅停止队列。
  • 高优先级抢占:可将高优先级任务插入到普通任务前执行。

核心区别于 ExecMan

特性ExecutionQueueExecMan
提交接口无锁等待(wait-free)依赖锁
批量处理支持批量执行,提高数据局部性每次处理不同 AsyncClient,缓存抖动严重
线程绑定不绑定固定线程,可独立调度多个队列任务映射到固定线程,跨队列调度受限
kthread 兼容性可安全使用 kthread 原语,不阻塞 pthread避免阻塞原语

背景:ExecutionQueue 本质上是基于 消息传递(Message Passing) 的资源管理机制,将业务逻辑拆成独立 actor,每个 actor 处理特定资源,消息驱动资源修改,实现同步或异步处理。

img


ExecutionQueue vs Mutex

优势:

  • 角色清晰,避免死锁等锁相关问题
  • FIFO 顺序保证执行顺序
  • 无线程闲置,提高资源利用率
  • 高负载下批量处理效率更高

缺点:

  • 逻辑分散,代码理解成本高
  • 跨 ExecutionQueue 的任务拆分增加核心间开销
  • 多资源原子操作复杂,需要额外调度队列
  • 队列单线程处理,慢任务阻塞后续任务
  • 高并发下可能积累大量待执行任务,占用内存

选择建议:

  • 小临界区 & 轻度竞争 → Mutex 最优
  • 严格顺序执行或高竞争 → ExecutionQueue 更合适

注意:Linux 上无竞争的 mutex lock/unlock 只需几条原子指令,开销可忽略。


使用方法

1. 实现执行函数

template <typename T>
class TaskIterator;

template <typename T>
int demo_execute(void* meta, TaskIterator<T>& iter) {
if (iter.is_queue_stopped()) {
// meta 资源释放
return 0;
}
for (; iter; ++iter) {
// 执行业务逻辑,例如 do_something(meta, *iter)
}
return 0;
}

2. 启动 ExecutionQueue

struct ExecutionQueueOptions {
bool use_pthread = false; // 使用 pthread 而非 kthread
kthread_attr_t kthread_attr = KTHREAD_ATTR_NORMAL;
Executor* executor = nullptr; // 指定 executor
};

template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>& iter),
void* meta);
  • 返回值为 64 位 ID,可作为弱引用定位 ExecutionQueue
  • meta 生命周期需覆盖队列整个生命周期

3. 停止 ExecutionQueue

template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);

template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);
  • execution_queue_stop 线程安全,可多次调用
  • meta 可在 iter.is_queue_stopped() == trueexecution_queue_join 后释放

4. 提交任务

struct TaskOptions {
bool high_priority = false; // 高优先级任务
bool in_place_if_possible = false; // 可能直接在调用线程执行
};

const TaskOptions TASK_OPTIONS_NORMAL;
const TaskOptions TASK_OPTIONS_URGENT{true};
const TaskOptions TASK_OPTIONS_INPLACE{false, true};

// 普通提交
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, T&& task);

// 提交带选项
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, T&& task,
const TaskOptions* options,
TaskHandle* handle = nullptr);
  • 高优先级任务在队列内严格 FIFO
  • in_place_if_possible 可消除线程调度开销,但可能导致死锁或递归,需要谨慎使用

5. 取消已提交任务

int execution_queue_cancel(const TaskHandle& h);
// 返回值:
// -1: 已执行或 handle 无效
// 0: 取消成功
// 1: 正在执行

返回非零不保证逻辑任务完成,需要额外业务逻辑确认任务终止。