Introduction to asynchronous programming
dlbeer@gmail.com
19 Mar 2013
Note: this article is a reformatted version of a document distributed with the libdlb source. The original document can be viewed online here.
What is asynchronous programming?
System calls like read(), write() and sleep() from POSIX are APIs for sequential programming. You call them to perform a single IO operation, and they return when that operation is completed. This is fine for programs with simple IO requirements, but in more complex programs and in servers, it’s often required that you be able to carry out and wait for several IO operations concurrently – you wouldn’t want an HTTP server to have to complete an entire request before serving a new client.
There are two popular strategies for doing this:
- Threads: spawn a new thread for each separable IO operation – perhaps one thread for each client in an HTTP server. Each thread can then be written as a series of sequential IO operations.
- Pros: simple programming interface, ability to use multiple cores
- Cons: significant OS overhead if each thread is largely IO bound
- Events: wait for an IO to become ready by registering a callback. When IO becomes possible, the callback is invoked from a single-threaded event loop and then performs the IO.
- Pros: very low OS overhead, low latency
- Cons: unable to make use of multiple cores, more difficult programming interface due to inversion of control.
The strategy described here is one which combines good features from both, and can be implemented without the need for a new language, OS or runtime:
- Asynchronous programming: begin an IO operation by registering a callback. The IO is carried out in the background and when complete, the callback is invoked in one of several threads in a pool.
- Pros: very low OS overhead, low latency, able to make use of multiple cores
- Cons: difficult programming interface (similar to events)
The description above of asynchronous programming would lead you to believe that it’s nearly the same as event-based programming. There are a few subtle, but important, differences:
In an event-based system, pending events may be cancelled, and if this is done, the callback is prevented from being invoked. This isn’t possible to do in a deterministic way in an asynchronous system (but cancellation of a kind is possible).
In an event-based system, an event may be periodic (for example, an interval timer). Asynchronous operations maintain a strict one-to-one relationship between requests and callback.
Both of these differences are related to the way in which callbacks are dispatched. An event-based system dispatches from a single thread, whereas an asynchronous system dispatches from a thread pool. This introduces some constraints and rules which must be followed in order to avoid race conditions.
Example: asynchronous counter
/* This program counts to 10, displaying one number per second using
* asynchronous waits.
*/
#include <stdio.h>
#include "io/ioq.h"
#include "io/mailbox.h"
/* The IO queue manager. */
static struct ioq global_queue;
/* A mailbox is a thread-safe object that contains an atomic flags
* register. We can wait asynchronously for flags to change state, but
* we're not making use of this feature here -- we just want an atomic
* variable.
*/
static struct mailbox done_box;
/* This is our asynchronous timer callback. We don't know exactly which
* thread it will run on, but it'll be one of the threads from the IO
* queue's thread pool.
*/
static void timer_continue(struct waitq_timer *my_timer)
{
static int counter = 0;
/* Every time the timer wait finishes, we increment our counter
* and display the new value.
*/
++;
counter("%d\n", counter);
printf
if (counter >= 10) {
/* If this is the tenth time we've invoked the callback,
* raise a mailbox flag to indicate that we're done.
*/
(&done_box, 1);
mailbox_raise
/* The main thread can read the flag, but it doesn't
* know that the flag has changed (because we're running
* in a different thread). We need to interrupt the call
* in the main thread to ioq_iterate() in order to make
* it check.
*/
(&global_queue);
ioq_notify} else {
/* We haven't yet run 10 times. Asynchronously wait once
* more. Note that as soon as we call
* waitq_timer_wait(), the callback is "live" and may be
* invoked. That's ok, because this is the last thing
* we're doing in this callback instance, so it's safe
* to run another instance immediately.
*/
(my_timer, 1000, timer_continue);
waitq_timer_wait}
}
int main(void)
{
struct waitq_timer my_timer;
/* Initialize the IO queue with a single thread in the thread
* pool. This might fail, but we're omitting error handling for
* brevity.
*/
if (ioq_init(&global_queue, 1) < 0)
();
abort
/* Initialize our mailbox. We need to associate it with an IO
* queue for asynchronous waits to work (although we're not
* using them here).
*/
(&done_box, ioq_runq(&done_box));
mailbox_init
/* Initialize the asynchronous timer by associating it with the
* IO queue.
*/
(&my_timer, ioq_waitq(&global_queue));
waitq_timer_init
/* Begin an asynchronous wait on the timer. Only one
* asynchronous wait may be in progress at a time -- as soon as
* timer_continue() is invoked, that's our signal that the
* operation has completed.
*/
(&my_timer, 1000, timer_continue);
waitq_timer_wait
/* Keep running the IO queue until a flag is raised in the
* mailbox.
*/
while (!mailbox_take(&done_box, MAILBOX_ALL_FLAGS))
(&global_queue);
ioq_iterate
/* Free resources. Note that the timer doesn't acquire resources
* for itself, and doesn't need to be destroyed.
*/
(&done_box);
mailbox_destroy(&global_queue);
ioq_destroyreturn 0;
}
Principles
With reference to the example program, note the following principles:
One-to-one correspondence: ONE request = ONE callback, always. Note that waitq_timer_wait() is invoked 10 times with timer_continue() specified as a callback. timer_continue() is also called 10 times.
Cancellation: cancellation wasn’t shown above, but it can be done, and it doesn’t work the same way as in an event-based program. After a call to waitq_timer_wait(), we could have cancelled the timer with:
waitq_timer_cancel(&my_timer)
What would this have done? It would have caused timer_continue() to be invoked as soon as possible, rather than after 1000 ms had elapsed – so the one-to-one relationship between requests and callbacks is preserved. Note that it’s the caller’s problem to distinguish between “real” callbacks and cancellations. This could be done either by setting a (thread-safe) flag before waitq_timer_cancel(), or by checking the clock against the expected deadline in the timer callback.
This is known as asynchronous cancellation, whereas the event-based kind (where the callback is prevented from being invoked) is known as synchronous cancellation. We can’t do synchronous cancellation, because at the time you call waitq_timer_cancel(), the callback might already be executing in a different thread!
Since cancellation is often performed from a different logical thread than the one which will run the callback, it sometimes happens that the cancellation request occurs after the callback has already executed. If you intend to re-use the IO object, this is a possible source of race conditions if you aren’t careful.
Thread safety: IO objects, such as struct waitq_timer, are not thread-safe themselves, although they do interact with “hidden” threads in the IO system. What this means in practice is that you must avoid accessing the same IO object from different threads simultaneously.
There is one exception to this, and that is cancellation: you may call waitq_timer_cancel(), for example, from any thread.
Limited subscription: IO objects may have at most only one outstanding IO operation in progress at any time.
In the example program, thread safety is guaranteed by the fact that we never access my_timer between waitq_timer_wait() and timer_continue(). The second, and subsequent calls to waitq_timer_wait(), are done in tail position from timer_continue().
This is a common pattern in asynchronous programming when dealing with a single isolated sequence of IO operations: because all requests are made in tail position from callbacks, the sequence of callbacks is effectively single threaded. The following diagram illustrates the execution of code in callbacks and the hidden IO operations:
Thread pool IO manager
=======================================================
+------------+
| Init func |
+------------+ ====== wait() ======>
+----------------+
| IO operation 1 |
<===== complete ===== +----------------+
+------------+
| Callback 1 |
+------------+ ====== wait() ======>
+----------------+
| IO operation 2 |
<===== complete ===== +----------------+
+------------+
| Callback 2 |
+------------+
The callbacks and IO operations are strictly interleaved, with control transfers taking place during requests and completions.
While the diagram above shows IO operations as distinct, in practice, multiple IO operations will be efficiently multiplexed in the IO manager using facilities such as epoll().
Asynchronous IO primitives
The following header files contain useful IO primitives:
runq.h:
struct runq_task is the simplest IO primitive. A request to runq_task_exec() invokes the given callback on the thread pool as soon as possible. This is useful if you want to defer work, or as an easy way to parallelize operations (in conjunction with the mailbox, described below).waitq.h:
struct waitq_timer is a one-shot timer. You can asynchronously wait by specifying a time period in milliseconds and a callback to be invoked after the time period has elapsed. Timers support asynchronous cancellation.ioq.h:
struct ioq_fd is a file descriptor monitor. After initializing it by associating it with a file descriptor and an IO queue, you can asynchronously wait for readability, writability, or error conditions on the selected file descriptor. These objects support asynchronous cancellation.Note two important restrictions:
You may not associate more than one struct ioq_fd with the same file descriptor.
You may not have more than one outstanding wait in progress at a time. If you need to wait for a new condition when you’re already waiting for an old one, you must first asynchronously cancel the old wait operation.
mailbox.h:
struct mailbox is an asynchrous IPC primitive, intended for signalling between logical threads. It contains of a thread-safe 32-bit register, each bit of which is treated as a “flag”. Two atomic operations are possible:raise(): set any subset of flags in the mailbox, leaving the others untouched.
take(): clear any subset of flags in the mailbox, leaving the others untouched. The state of the mailbox, prior to clearing, is returned.
These two operations alone make it useful as a simple atomic variable. However, to allow actual IPC, two asynchronous wait operations are provided:
wait(): given a subset of flags and a callback, invoke the callback when any of the flags in the subset become raised.
wait_any(): given a set of flags and a callback, invoke the callback when ALL of the flags in the subset become raised.
The wait operations are “level triggered”: if the condition waited for is already true at the time of the call, the callback will be invoked immediately (but still asynchronously).
Check the header files for more details. Most of the asynchronous operations are implemented using intrusive data structures in a way which guarantees that they can’t fail.
Mailbox-based patterns
The simplest pattern of asynchronous programming is the “single-strand”, or single logical thread consisting of a sequence of asynchronous operations, as shown in the example code above.
More complex and interesting patterns are possible by making use of struct mailbox, defined in io/mailbox.h. Some useful concepts are described in subsections here.
Producer/consumer
The most obvious use of a mailbox is to use it as an IPC primitive. In this case, it may be associated with a shared data structure (in this example, a FIFO queue) to provide more sophisticated communication.
First, a flag is chosen to be associated with the FIFO queue. It doesn’t matter which, just as long as it’s not used for any other purpose. Our data structures and definitions are:
#define QUEUE_READY_FLAG MAILBOX_FLAG(0)
;
thr_mutex_t lockstruct slist fifo;
struct mailbox mbox;
When the producer wants to send an item via the queue, it does:
/* Safely place the data in the queue */
(&lock);
thr_mutex_lock(&fifo, &my_data.node);
slist_append(&lock);
thr_mutex_unlock
/* Wake up the consumer */
(&mbox, QUEUE_READY_FLAG); mailbox_raise
The consumer should repeatedly wait for the flag to be raised, and empty the queue:
/* Attempt to pop a datum from the queue with the
* lock held.
*/
static struct datum *pop_queue_locked(void)
{
struct slist_node *n;
(&lock);
thr_mutex_lock= slist_pop(&fifo);
n (&lock);
thr_mutex_unlock
if (n)
return container_of(n, struct datum, node);
return NULL;
}
/* Queue handler: call this function once to start the consumption
* of data.
*/
static void consume_queue(struct mailbox *m)
{
/* Acknowledge the wake-up request */
(&mbox, QUEUE_READY_FLAG);
mailbox_take
/* Remove data from the queue. We must loop until the queue
* is empty to avoid stalling (all wakeups for items in the
* queue have already been delivered -- we won't be woken up
* again unless new data arrives).
*/
for (;;) {
struct datum *d = pop_queue_locked();
/* Nothing left? Stop processing and wait again. */
if (!d)
break;
(d);
process_data}
/* Wait again. this must be the last thing we do, in order
* to maintain a single strand of execution.
*/
(&mbox, QUEUE_READY_FLAG, consume_queue);
mailbox_wait}
Fork/join
Another useful pattern, making use of the wait_all operation, is using a mailbox to coordinate parallel operations. These operations can be either IO operations or computations (using the run queue).
In this example, we use a mailbox to synchronize with the completion of two parallel tasks:
#define TASK_A_FLAG MAILBOX_FLAG(0)
#define TASK_B_FLAG MAILBOX_FLAG(1)
static struct mailbox box;
static struct runq_task task_a;
static struct runq_task task_b;
static void task_a_func(struct runq_task *t)
{
();
do_something_a(&box, TASK_A_FLAG);
mailbox_raise}
static void task_b_func(struct runq_task *t)
{
();
do_something_b(&box, TASK_B_FLAG);
mailbox_raise}
static void all_done(struct mailbox *m)
{
/* Tasks A and B have both completed... */
(&box, TASK_A_FLAG | TASK_B_FLAG);
mailbox_take/* Process the results here */
}
static void do_both_tasks(void)
{
/* Asynchronously start both tasks. Both task_a_func and
* task_b_func can immediately be scheduled in parallel.
*/
(&task_a, task_a_func);
runq_task_exec(&task_b, task_b_func);
runq_task_exec
/* Asynchronously wait for both tasks to finish */
(&box, TASK_A_FLAG | TASK_B_FLAG, all_done);
mailbox_wait_all}
In this example, task_a_func and task_b_func were computations, started via a struct runq_task. However, they could just have easily been callbacks from an asynchronous IO operation, or a mix of both.
A few rules must be observed for thread-safety:
task_a_func and task_b_func can run in parallel, and must not share data without correct locking.
Any data used by either task can’t be touched until they have finished. In practice, this means avoiding touching any of the task’s data between the call to initiate the task (runq_task_exec) and the callback which indicates that both tasks have completed (all_done).
Since mailbox_raise() may result in the execution of all_done(), it is the last statement in the task callbacks.
Similarly, mailbox_wait_all() may also result in the execution of all_done(), and it is again the last statement in the initiating function.
Reactor
Often, it’s useful to express parts of a program in terms of a state machine responding to a serialized sequence of events (in an event-driven system, your entire program would be of this form). In an asynchronously programmed server, you might want to run a state machine for each client, for example.
This can be done by constructing a main loop based around an asynchronous mailbox wait: a callback function wakes up on receipt of a mailbox flag, processes events, and then waits again. Other asynchronous operations started by the main loop run concurrently to perform simple IO tasks, raising flags upon completion.
The general strategy for implementing this is to first identify the IO operations which you will need to perform. These should be as primitive as possible, because all the completion handler is expected to do is report to the main loop that the IO operation has completed.
The main loop itself centres, as indicated above, around a mailbox:
static struct mailbox loop_box;
static void main_loop(void)
{
/* Take events from the mailbox */
const mailbox_flags_t events =
(&loop_box, MAILBOX_ALL_FLAGS);
mailbox_take
/* React to flags in events... */
/* (see below for details) */
/* Wait for more events */
(&loop_box, MAILBOX_ALL_FLAGS, main_loop);
mailbox_wait}
For each IO operation, you need the following functions and data structures:
/* A unique flag to indicate completion */
#define IO_FLAG_A MAILBOX_FLAG(0)
/* A state flag to indicate that the operation is in progress (you
* might prefer to share bits of a bit-field). This flag is accessed
* only by the main loop, so no locking is required.
*/
static int a_in_progress;
/* A function to start the IO operation */
static void begin_a(void)
{
if (!a_in_progress) {
(&my_object, a_complete);
io_operation_wait= 1;
a_in_progress }
}
/* ...and a callback function. This does nothing except for waking
* up the main loop via a mailbox flag.
*/
static void a_complete(void)
{
(&loop_box, IO_FLAG_A);
mailbox_raise}
In the main loop, you need to react to the receipt of IO_FLAG_A:
if (events & IO_FLAG_A) {
= 0;
a_in_progress ();
react_to_a
/* You might also want to call begin_a(), if this is a
* continuous process.
*/
}
Note that thread-safety is achieved via the fact that the only functions to run in parallel with the main thread are the IO completion callbacks, and all they do is call mailbox_raise(), which is an entirely safe operation. The end result is a serialized sequence of invocations of main_loop(), each retrieving and responding to a set of events.
The only tricky part is terminating the loop: at the time of termination, it may be that you have IO operations outstanding. When they complete, they will attempt to raise a mailbox flag, so the mailbox can’t be destroyed until this happens.
Fortunately, shut-down can be arranged safely provided that you are keeping track of outstanding operations. First, request cancellation of each IO operation which you have recorded as being in progress (cancellation will be specific to the particular type of IO operation). Record that you are in the process of shutting down, and refrain from starting new operations when old ones complete.
As each operation completes, check to see if there are any more outstanding. If no more operations are outstanding, it’s safe to terminate by cleaning up data structures and simply returing from the main loop.