Introduction to asynchronous programming

Daniel Beer
dlbeer@gmail.com
19 Mar 2013

Note: this article is a reformatted version of a document distributed with the libdlb source. The original document can be viewed online here.

What is asynchronous programming?

System calls like read(), write() and sleep() from POSIX are APIs for sequential programming. You call them to perform a single IO operation, and they return when that operation is completed. This is fine for programs with simple IO requirements, but in more complex programs and in servers, it's often required that you be able to carry out and wait for several IO operations concurrently -- you wouldn't want an HTTP server to have to complete an entire request before serving a new client.

There are two popular strategies for doing this:

The strategy described here is one which combines good features from both, and can be implemented without the need for a new language, OS or runtime:

The description above of asynchronous programming would lead you to believe that it's nearly the same as event-based programming. There are a few subtle, but important, differences:

Both of these differences are related to the way in which callbacks are dispatched. An event-based system dispatches from a single thread, whereas an asynchronous system dispatches from a thread pool. This introduces some constraints and rules which must be followed in order to avoid race conditions.

Example: asynchronous counter

/* This program counts to 10, displaying one number per second using
 * asynchronous waits.
 */
#include <stdio.h>
#include "io/ioq.h"
#include "io/mailbox.h"

/* The IO queue manager. */
static struct ioq global_queue;

/* A mailbox is a thread-safe object that contains an atomic flags
 * register. We can wait asynchronously for flags to change state, but
 * we're not making use of this feature here -- we just want an atomic
 * variable.
 */
static struct mailbox done_box;

/* This is our asynchronous timer callback. We don't know exactly which
 * thread it will run on, but it'll be one of the threads from the IO
 * queue's thread pool.
 */
static void timer_continue(struct waitq_timer *my_timer)
{
        static int counter = 0;

        /* Every time the timer wait finishes, we increment our counter
         * and display the new value.
         */
        counter++;
        printf("%d\n", counter);

        if (counter >= 10) {
                /* If this is the tenth time we've invoked the callback,
                 * raise a mailbox flag to indicate that we're done.
                 */
                mailbox_raise(&done_box, 1);

                /* The main thread can read the flag, but it doesn't
                 * know that the flag has changed (because we're running
                 * in a different thread). We need to interrupt the call
                 * in the main thread to ioq_iterate() in order to make
                 * it check.
                 */
                ioq_notify(&global_queue);
        } else {
                /* We haven't yet run 10 times. Asynchronously wait once
                 * more. Note that as soon as we call
                 * waitq_timer_wait(), the callback is "live" and may be
                 * invoked. That's ok, because this is the last thing
                 * we're doing in this callback instance, so it's safe
                 * to run another instance immediately.
                 */
                waitq_timer_wait(my_timer, 1000, timer_continue);
        }
}

int main(void)
{
        struct waitq_timer my_timer;

        /* Initialize the IO queue with a single thread in the thread
         * pool. This might fail, but we're omitting error handling for
         * brevity.
         */
        if (ioq_init(&global_queue, 1) < 0)
                abort();

        /* Initialize our mailbox. We need to associate it with an IO
         * queue for asynchronous waits to work (although we're not
         * using them here).
         */
        mailbox_init(&done_box, ioq_runq(&done_box));

        /* Initialize the asynchronous timer by associating it with the
         * IO queue.
         */
        waitq_timer_init(&my_timer, ioq_waitq(&global_queue));

        /* Begin an asynchronous wait on the timer. Only one
         * asynchronous wait may be in progress at a time -- as soon as
         * timer_continue() is invoked, that's our signal that the
         * operation has completed.
         */
        waitq_timer_wait(&my_timer, 1000, timer_continue);

        /* Keep running the IO queue until a flag is raised in the
         * mailbox.
         */
        while (!mailbox_take(&done_box, MAILBOX_ALL_FLAGS))
                ioq_iterate(&global_queue);

        /* Free resources. Note that the timer doesn't acquire resources
         * for itself, and doesn't need to be destroyed.
         */
        mailbox_destroy(&done_box);
        ioq_destroy(&global_queue);
        return 0;
}

Principles

With reference to the example program, note the following principles:

In the example program, thread safety is guaranteed by the fact that we never access my_timer between waitq_timer_wait() and timer_continue(). The second, and subsequent calls to waitq_timer_wait(), are done in tail position from timer_continue().

This is a common pattern in asynchronous programming when dealing with a single isolated sequence of IO operations: because all requests are made in tail position from callbacks, the sequence of callbacks is effectively single threaded. The following diagram illustrates the execution of code in callbacks and the hidden IO operations:

Thread pool                                  IO manager
=======================================================

+------------+
| Init func  |
+------------+ ====== wait() ======>
                                     +----------------+
                                     | IO operation 1 |
               <===== complete ===== +----------------+
+------------+
| Callback 1 |
+------------+ ====== wait() ======>
                                     +----------------+
                                     | IO operation 2 |
               <===== complete ===== +----------------+
+------------+
| Callback 2 |
+------------+

The callbacks and IO operations are strictly interleaved, with control transfers taking place during requests and completions.

While the diagram above shows IO operations as distinct, in practice, multiple IO operations will be efficiently multiplexed in the IO manager using facilities such as epoll().

Asynchronous IO primitives

The following header files contain useful IO primitives:

Check the header files for more details. Most of the asynchronous operations are implemented using intrusive data structures in a way which guarantees that they can't fail.

Mailbox-based patterns

The simplest pattern of asynchronous programming is the "single-strand", or single logical thread consisting of a sequence of asynchronous operations, as shown in the example code above.

More complex and interesting patterns are possible by making use of struct mailbox, defined in io/mailbox.h. Some useful concepts are described in subsections here.

Producer/consumer

The most obvious use of a mailbox is to use it as an IPC primitive. In this case, it may be associated with a shared data structure (in this example, a FIFO queue) to provide more sophisticated communication.

First, a flag is chosen to be associated with the FIFO queue. It doesn't matter which, just as long as it's not used for any other purpose. Our data structures and definitions are:

#define QUEUE_READY_FLAG        MAILBOX_FLAG(0)

thr_mutex_t                     lock;
struct slist                    fifo;
struct mailbox                  mbox;

When the producer wants to send an item via the queue, it does:

/* Safely place the data in the queue */
thr_mutex_lock(&lock);
slist_append(&fifo, &my_data.node);
thr_mutex_unlock(&lock);

/* Wake up the consumer */
mailbox_raise(&mbox, QUEUE_READY_FLAG);

The consumer should repeatedly wait for the flag to be raised, and empty the queue:

/* Attempt to pop a datum from the queue with the
 * lock held.
 */
static struct datum *pop_queue_locked(void)
{
        struct slist_node *n;

        thr_mutex_lock(&lock);
        n = slist_pop(&fifo);
        thr_mutex_unlock(&lock);

        if (n)
                return container_of(n, struct datum, node);

        return NULL;
}

/* Queue handler: call this function once to start the consumption
 * of data.
 */
static void consume_queue(struct mailbox *m)
{
        /* Acknowledge the wake-up request */
        mailbox_take(&mbox, QUEUE_READY_FLAG);

        /* Remove data from the queue. We must loop until the queue
         * is empty to avoid stalling (all wakeups for items in the
         * queue have already been delivered -- we won't be woken up
         * again unless new data arrives).
         */
        for (;;) {
                struct datum *d = pop_queue_locked();

                /* Nothing left? Stop processing and wait again. */
                if (!d)
                        break;

                process_data(d);
        }

        /* Wait again. this must be the last thing we do, in order
         * to maintain a single strand of execution.
         */
        mailbox_wait(&mbox, QUEUE_READY_FLAG, consume_queue);
}

Fork/join

Another useful pattern, making use of the wait_all operation, is using a mailbox to coordinate parallel operations. These operations can be either IO operations or computations (using the run queue).

In this example, we use a mailbox to synchronize with the completion of two parallel tasks:

#define TASK_A_FLAG             MAILBOX_FLAG(0)
#define TASK_B_FLAG             MAILBOX_FLAG(1)

static struct mailbox box;
static struct runq_task task_a;
static struct runq_task task_b;

static void task_a_func(struct runq_task *t)
{
        do_something_a();
        mailbox_raise(&box, TASK_A_FLAG);
}

static void task_b_func(struct runq_task *t)
{
        do_something_b();
        mailbox_raise(&box, TASK_B_FLAG);
}

static void all_done(struct mailbox *m)
{
        /* Tasks A and B have both completed... */
        mailbox_take(&box, TASK_A_FLAG | TASK_B_FLAG);
        /* Process the results here */
}

static void do_both_tasks(void)
{
        /* Asynchronously start both tasks. Both task_a_func and
         * task_b_func can immediately be scheduled in parallel.
         */
        runq_task_exec(&task_a, task_a_func);
        runq_task_exec(&task_b, task_b_func);

        /* Asynchronously wait for both tasks to finish */
        mailbox_wait_all(&box, TASK_A_FLAG | TASK_B_FLAG, all_done);
}

In this example, task_a_func and task_b_func were computations, started via a struct runq_task. However, they could just have easily been callbacks from an asynchronous IO operation, or a mix of both.

A few rules must be observed for thread-safety:

Reactor

Often, it's useful to express parts of a program in terms of a state machine responding to a serialized sequence of events (in an event-driven system, your entire program would be of this form). In an asynchronously programmed server, you might want to run a state machine for each client, for example.

This can be done by constructing a main loop based around an asynchronous mailbox wait: a callback function wakes up on receipt of a mailbox flag, processes events, and then waits again. Other asynchronous operations started by the main loop run concurrently to perform simple IO tasks, raising flags upon completion.

The general strategy for implementing this is to first identify the IO operations which you will need to perform. These should be as primitive as possible, because all the completion handler is expected to do is report to the main loop that the IO operation has completed.

The main loop itself centres, as indicated above, around a mailbox:

static struct mailbox loop_box;

static void main_loop(void)
{
        /* Take events from the mailbox */
        const mailbox_flags_t events =
            mailbox_take(&loop_box, MAILBOX_ALL_FLAGS);

        /* React to flags in events... */
        /* (see below for details) */

        /* Wait for more events */
        mailbox_wait(&loop_box, MAILBOX_ALL_FLAGS, main_loop);
}

For each IO operation, you need the following functions and data structures:

/* A unique flag to indicate completion */
#define IO_FLAG_A               MAILBOX_FLAG(0)

/* A state flag to indicate that the operation is in progress (you
 * might prefer to share bits of a bit-field). This flag is accessed
 * only by the main loop, so no locking is required.
 */
static int a_in_progress;

/* A function to start the IO operation */
static void begin_a(void)
{
        if (!a_in_progress) {
                io_operation_wait(&my_object, a_complete);
                a_in_progress = 1;
        }
}

/* ...and a callback function. This does nothing except for waking
 * up the main loop via a mailbox flag.
 */
static void a_complete(void)
{
        mailbox_raise(&loop_box, IO_FLAG_A);
}

In the main loop, you need to react to the receipt of IO_FLAG_A:

if (events & IO_FLAG_A) {
        a_in_progress = 0;
        react_to_a();

        /* You might also want to call begin_a(), if this is a
         * continuous process.
         */
}

Note that thread-safety is achieved via the fact that the only functions to run in parallel with the main thread are the IO completion callbacks, and all they do is call mailbox_raise(), which is an entirely safe operation. The end result is a serialized sequence of invocations of main_loop(), each retrieving and responding to a set of events.

The only tricky part is terminating the loop: at the time of termination, it may be that you have IO operations outstanding. When they complete, they will attempt to raise a mailbox flag, so the mailbox can't be destroyed until this happens.

Fortunately, shut-down can be arranged safely provided that you are keeping track of outstanding operations. First, request cancellation of each IO operation which you have recorded as being in progress (cancellation will be specific to the particular type of IO operation). Record that you are in the process of shutting down, and refrain from starting new operations when old ones complete.

As each operation completes, check to see if there are any more outstanding. If no more operations are outstanding, it's safe to terminate by cleaning up data structures and simply returing from the main loop.