Execution Queue
Similar to Kylin's ExecMan, ExecutionQueue provides the capability for asynchronous serial task execution. The underlying technology of ExecutionQueue was first applied in RPC to implement multi-threaded writing to the same file descriptor. It was integrated into kthread after revision r31345. ExecutionQueue offers the following core features:
- Asynchronous Ordered Execution: Tasks are executed in a dedicated separate thread, strictly following the order of submission.
- Multi-Producer Support: Multiple threads can submit tasks to a single ExecutionQueue concurrently.
- Task Cancellation: Allows cancellation of tasks that have already been submitted.
- Queue Termination: Supports graceful stopping of the execution queue.
- High-Priority Task Preemption: Permits high-priority tasks to be inserted ahead of normal-priority tasks in the queue.
Key differences from ExecMan:
- Wait-Free Submission Interface: The task submission interface of ExecutionQueue is wait-free, whereas ExecMan relies on locks. This means that during high system load, ExecutionQueue avoids global thread blocking caused by involuntary process context switches.
- Batch Processing Capability: The worker thread of ExecutionQueue can process submitted tasks in batches, achieving better data locality. In contrast, after an ExecMan thread finishes processing an AsyncContext from one AsyncClient, the next task is likely to belong to a different AsyncClient, leading to frequent CPU cache thrashing between resources associated with different AsyncClients.
- Unbound Execution Threads: ExecutionQueue tasks are not bound to fixed threads. ExecMan maps tasks to fixed worker threads based on AsyncClient hashing. Tasks across different ExecutionQueues are processed completely independently. When sufficient worker threads are available, all non-idle ExecutionQueues can be scheduled concurrently. Conversely, when thread resources are insufficient, ExecutionQueue cannot guarantee scheduling fairness. In such cases, the overall processing capacity should be scaled by dynamically increasing the number of kthread worker threads.
- kthread Compatibility: ExecutionQueue runs on kthread, enabling safe use of kthread synchronization primitives without the risk of blocking pthread execution. In ExecMan, however, synchronization primitives that are likely to cause blocking should be avoided as much as possible.
Background
In the field of multi-core concurrent programming, Message passing is widely adopted as a mechanism to resolve resource contention. It decomposes business logic into multiple independent actors based on resource dependencies, where each actor is responsible for managing a specific set of resources. When a workflow needs to modify a resource, it encapsulates the operation into a message and sends it to the corresponding actor. The actor (usually running in a separate execution context) processes the resource according to the message content, and then either wakes up the caller (synchronous processing) or forwards the result to the next actor in the pipeline (asynchronous processing).

ExecutionQueue Vs Mutex
Both ExecutionQueue and mutex can be used to eliminate race conditions in multi-threaded scenarios. Compared with mutex, ExecutionQueue has the following advantages:
- Clear Role Separation: The programming model is conceptually simple, eliminating the need to handle lock-related issues such as deadlocks.
- Guaranteed Execution Order: Ensures strict FIFO task execution, whereas mutex cannot guarantee the wake-up order of waiting threads.
- No Thread Idling: All threads are engaged in productive work without blocking on locks, improving overall resource utilization.
- Better Throughput Under High Load: Excels at batch task processing during system congestion, achieving higher overall throughput.
However, it also has notable drawbacks:
- Scattered Code Logic: The implementation of a single workflow is often split across multiple components, increasing the cost of code comprehension and maintenance.
- Increased Inter-Core Overhead: To improve concurrency, a single task is often split across multiple ExecutionQueues for pipeline processing. This leads to frequent inter-core context switches and cache synchronization overhead, which cannot be ignored especially when the critical section of the original task is very small.
- Complex Atomic Multi-Resource Operations: Coordinating atomic operations across multiple resources becomes more challenging. While mutex allows acquiring multiple locks simultaneously, ExecutionQueue requires an additional dispatch queue to achieve similar functionality.
- Single-Threaded Bottleneck: Since all tasks in an ExecutionQueue are processed sequentially, a slow-running task will block all subsequent tasks in the same queue.
- Complex Concurrency Control: ExecutionQueue may consume excessive memory due to accumulating pending tasks, requiring careful capacity management.
In theory, any system can be implemented using either mutex or ExecutionQueue alone to eliminate contention, regardless of performance and complexity trade-offs. However, for complex system design, it is recommended to flexibly choose the appropriate tool based on specific scenarios:
- Prefer Mutex for Small Critical Sections: If the critical section is small and contention is mild, mutex is the optimal choice. You can later use a contention profiler to determine if the mutex has become a performance bottleneck.
- Prefer ExecutionQueue for Ordered Execution or High Contention: When strict execution order is required, or when contention is unavoidable but can be mitigated through batch processing to improve throughput, ExecutionQueue is the better option.
In conclusion, there is no one-size-fits-all model for multi-threaded programming. The optimal solution requires balancing complexity and performance based on specific scenarios, combined with comprehensive profiling tools.
A critical note: On Linux, lock/unlock operations on a mutex with no contention only require a few atomic instructions, and their overhead is negligible in most scenarios.
Usage
Implement the Execution Function
// Iterate over the given tasks
//
// Example:
//
// #include <kthread/execution_queue.h>
//
// int demo_execute(void* meta, TaskIterator<T>& iter) {
// if (iter.is_queue_stopped()) {
// // Destroy meta and related resources
// return 0;
// }
// for (; iter; ++iter) {
// // do_something(meta, *iter)
// // or do_something(meta, iter->a_member_of_T)
// }
// return 0;
// }
template <typename T>
class TaskIterator;
Start an ExecutionQueue
struct ExecutionQueueOptions {
ExecutionQueueOptions();
// Execute in resident pthread instead of kthread. Default: false.
bool use_pthread;
// Attribute of the kthread which execute runs on. Default: KTHREAD_ATTR_NORMAL
// Kthread will be used when executor = NULL and use_pthread == false.
kthread_attr_t kthread_attr;
// Executor that tasks run on. Default: NULL
// Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
// Executor is in-place(synchronous).
Executor * executor;
};
// Start an ExecutionQueue. If |options| is NULL, the queue will be created with
// default options.
// Returns 0 on success, errno otherwise
// NOTE: type |T| can be non-POD but must be copy-constructible
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>& iter),
void* meta);
The return value is a 64-bit ID, which acts as a weak reference to the ExecutionQueue instance. It allows wait-free O(1) time lookup of the ExecutionQueue. You can copy this ID freely, and even transmit it in RPC calls as a mechanism to locate remote resources.
You must ensure that the lifecycle of the meta pointer extends until the corresponding ExecutionQueue has been completely stopped.
Stop an ExecutionQueue
// Stop the ExecutionQueue.
// After this function is called:
// - All subsequent calls to execution_queue_execute will fail immediately.
// - The executor will invoke |execute| exactly once with TaskIterator::is_queue_stopped()
// returning true, after all pending tasks have been processed. After this invocation,
// it is safe to release the resources referenced by |meta|.
// Returns 0 on success, errno otherwise
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);
// Wait until the stop task (where Iterator::is_queue_stopped() returns true) has
// been executed
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);
Both execution_queue_stop and execution_queue_join can be called multiple times safely, with well-defined behavior. execution_queue_stop is thread-safe and can be invoked at any time.
Similar to closing a file descriptor, failing to call execution_queue_stop will result in permanent resource leaks.
Safe timing to release meta: You can release it either when the execute function receives a task with iter.is_queue_stopped() == true, or after execution_queue_join returns. Be careful to avoid double-free errors.
Submit Tasks
struct TaskOptions {
TaskOptions();
TaskOptions(bool high_priority, bool in_place_if_possible);
// Executor will execute high-priority tasks in FIFO order but before all pending
// normal-priority tasks.
// NOTE: We do not guarantee any form of real-time execution, as there may be
// uninterruptible tasks currently in progress.
//
// Default: false
bool high_priority;
// If |in_place_if_possible| is true, execution_queue_execute will invoke the
// execute function immediately instead of scheduling it to a kthread, if possible.
//
// Note: Enabling this flag may cause deadlocks or excessive recursion (e.g.,
// ping-ponging tasks between queues). Ensure your code is free of such issues
// before enabling this option.
//
// Default: false
bool in_place_if_possible;
};
const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(/*high_priority=*/ false, /*in_place_if_possible=*/ false);
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(/*high_priority=*/ true, /*in_place_if_possible=*/ false);
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(/*high_priority=*/ false, /*in_place_if_possible=*/ true);
// Thread-safe and Wait-free.
// Execute a task with default TaskOptions (normal task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename kutil::add_const_reference<T>::type task);
// Thread-safe and Wait-free.
// Execute a task with specified options. e.g
// kthread::execution_queue_execute(queue, task, &kthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, default options (normal task) will be used.
// If |handle| is not NULL, it will be assigned the handle of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename kutil::add_const_reference<T>::type task,
const TaskOptions* options);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename kutil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options,
TaskHandle* handle);
High-priority tasks are also executed in strict FIFO order relative to other high-priority tasks, which differs from ExecMan where the execution order of QueueExecEmergent AsyncContexts is undefined. However, this also means that no task can be inserted ahead of an existing high-priority task in the queue.
Enabling in_place_if_possible eliminates thread scheduling and cache synchronization overhead in contention-free
scenarios. However, it may lead to deadlocks or excessive recursion (e.g., ping-ponging tasks between queues).
Ensure your code is free of such risks before enabling this option.
Cancel a Submitted Task
/// [Thread safe and ABA free] Cancel the corresponding task.
// Returns:
// -1: The task has already been executed or the handle is invalid
// 0: Task canceled successfully
// 1: The task is currently being executed
int execution_queue_cancel(const TaskHandle& h);
A non-zero return value only indicates that the ExecutionQueue has already passed the task to the execute function.
In practice, the task may still be cached in another container within the business logic. Therefore, this does not
guarantee that the logical task has completed execution. You need to implement additional business-level checks to
confirm task termination.