Implementing cooperative multitasking in Rust

Table of Contents

I've implemented coroutines and python like generators in Rust (with a bit of assembly). This code was done as a learning exercise more than as something intended for production. There are already several libraries in Rust that do approximately the same thing. I believe they all call out to the hand written assembly in Boost's context library instead of writing it themselves.

1 The (amd64 System V) ABI

The ABI specifies how functions are called at an assembly level. The most important part of this is specifying how the call stack works. Each function called gets a stack frame starting where rsp (the stack pointer register) points when it is called, which is guaranteed to be 16 byte aligned 1. It can allocate space for local variables by decrementing rsp (the stack grows towards lower addresses), and then writing to where it points to. The operating system is free to do with whatever memory it wants above (that is, at lower valued addresses) than rsp, including writing to them or unmapping the memory so access results in segfaults 2.

typical_stack_frame.png

When calling a function we push the address of the instruction to return to onto the stack, store arguments in the right places (for our purposes it suffices to know that the first 3 integer/pointer arguments get put in registers rdi, rsi, and rdx). And then jump to the first instruction in the function called. When returning from a function we pop off the return address from the stack (leaving rsp pointing right before it), and jump to the address it contains. Called functions must ensure that rbx, rbp, r12, r13, r14, r15, the MXCSR control register, and the x87 control word have the same value when they return as when they were called (these are known as callee save registers), and are free to change the value of any other register.

2 Using multiple stacks

To implement coroutines and generates we are going to use multiple call stacks. If you consider the ABI, there is nothing restricting you to using rsp exactly as intended (incrementing/decrementing as a pointer to the top of a single stack), it just always needs to point to some memory with sufficient allocated space "above" it (at lower addresses) so that it doesn't overflow into anything bad. In particular it's possible to "switch" stacks by changing rsp from pointing to the top of one stack to the top of another.

In it's simplest form this could look like

mov     rsp, <new stack>  ; set the stack pointer to point at the new stack
jmp     <some function>   ; and start executing some function

At this point some function will execute just fine on a new stack, as long as it doesn't return. As it turns out it's easy to specify that it should never return in rust, we just give it a return type of !. We also want some way of actually calling this code (as well as a consistent environment for the next step), so we are going to wrap it in a function. Passing in the address of the bottom of the stack as the first parameter (rdi), and the address of the function to call as the second parameter (rsi).

mov     rsp, rdi
jmp     rsi

To be actually useful we are going to want some way of continuing on the original stack, otherwise this just turns into a fancy way to get a bigger stack 3. To do this we will store the callee save registers onto the stack, and pass the pointer to the current stack to the called function.

sub     rsp, 64           ; Allocate space on the stack for the callee save registers
mov     [rsp], rbx        ; copy all the callee save registers into it
mov     [rsp + 8],  rbp
mov     [rsp + 16], r12
mov     [rsp + 24], r13
mov     [rsp + 32], r14
mov     [rsp + 40], r15
stmxcsr [rsp + 48]        ; We need to use special instructions for the control word registers
fnstcw  [rsp + 52]        ; but these are both effectively just mov instructions.

mov     rax, rsp          ; swap the address of the current stack (rsp) and new stack (rdi, the first argument)
mov     rsp, rdi
mov     rdi, rax

jmp     rsi               ; jump to the new function

This is now almost the start function I actually want, the only addition is it turns out to be very useful to forward an argument to the function (a pointer to some data that we can use to pass arguments and move objects into the new stack).

sub     rsp, 64
mov     [rsp], rbx
mov     [rsp + 8],  rbp
mov     [rsp + 16], r12
mov     [rsp + 24], r13
mov     [rsp + 32], r14
mov     [rsp + 40], r15
stmxcsr [rsp + 48]
fnstcw  [rsp + 52]

mov     rax, rsp
mov     rsp, rdi
mov     rdi, rax

mov     rax, rsi          ; save the address we want to jump to
mov     rsi, rdx          ; move the pointer we are forwarding from rdx (arg 3) to rsi (arg 2).

jmp     rax               ; jump to the new function, which we saved in rax

To switch back from the new stack we can switch rsp back to the stack pointer we were passed, restore the callee save registers, and return. I'm going to wrap this code in a function, which expects that pointer as an argument (in rdi). Note that the return statement will pop the return pointer from the stack we just switched to and start executing there, not the stack where this "return" function (which I have called exit_to) is called from.

mov     rsp, rdi          ; Switch stack pointer back

mov     rbx, [rsp]        ; copy all the callee save registers back from the stack
mov     rbp, [rsp + 8]
mov     r12, [rsp + 16]
mov     r13, [rsp + 24]
mov     r14, [rsp + 32]
mov     r15, [rsp + 40]
ldmxcsr [rsp + 48]        ; Again we need to use special instructions for the control word 
fldcw   [rsp + 52]        ; registers but these are both effectively just mov instructions.
add     rsp, 64           ; free the space we allocated on the stack

ret                       ; pop and jump to the return address 

Like our initial implementation of switching to a new stack, this gives no way to switch back, though in this case it still ends up being a useful function. We can combine the two functions to make a switch function. The only question is what to do with the original stack pointer, since we aren't calling a function we can pass it to, and returning it ends up being fairly inconvenient. Instead we'll take a second argument as the address at which we should store the stack pointer (the first argument being the stack pointer to switch to).

            ; Save callee save registers as in start
sub     rsp, 64
mov     [rsp], rbx
mov     [rsp + 8],  rbp
mov     [rsp + 16], r12
mov     [rsp + 24], r13
mov     [rsp + 32], r14
mov     [rsp + 40], r15
stmxcsr [rsp + 48] 
fnstcw  [rsp + 52]

mov     [rsi], rsp        ; Save original stack pointer
mov     rsp, rdi          ; Switch into new stack

            ; Restore callee save registers and return as in exit_to
mov     rbx, [rsp]
mov     rbp, [rsp + 8]
mov     r12, [rsp + 16]
mov     r13, [rsp + 24]
mov     r14, [rsp + 32]
mov     r15, [rsp + 40]
ldmxcsr [rsp + 48]
fldcw   [rsp + 52]
add     rsp, 64

ret

3 Wrapping primitives in Rust

3.1 Writing assembly in rust

This section (3.1) is skippable. It is intended primarily as feedback to help drive rust development, and not targeted to as general an audience. The current solutions are unstable, buggy, and likely to change in the near future.

We've written some cool assembly above, but now we need a way to call it from our actual code. There are three ways to call assembly in rust. Compile it separately and link it like any other object file (like if one were linking C code), use naked functions and the asm! macro, or use the global_asm! macro.

I'd rather avoid the first if possible, since it means requiring a separate compiler, and having the assembly source far removed from the rust headers.

I initially went with the second, but rust issue 34043 means that it will result in undefined behaviour. I also encountered an issue where the compiler would inline my assembly, and rely on me not clobbering registers I didn't tell it I would clobber. So I would have to tell the compiler I clobber every single caller save register to avoid undefined behaviour (as does any assembly that calls a user supplied function!?). Which apart from being a pain, doesn't seem to be possible if I want to support future CPUs with yet more extensions. (The specific clobber code gen issue could be worked around with #[inline(never)], but I doubt that's a robust solution).

So I've ended up using the third method. This has three minor drawbacks. It only supports AT&T assembly syntax (the technical issue previously mentioned), names aren't mangled so we have to worry about name collisions, and I have to repeat the function name 3 times when defining it. I tried to use a macro (code below) to avoid the third drawback, but it turns out that trying to pass the output of concat! to global_asm! will result in an error "expected a literal" so (at least without procedural macros) that isn't possible.

// Warning, untested and doesn't work
macro_rules! asm_function {
    (pub unsafe extern "C" fn $name:ident($($arg:ident : $arg_type:ty),*) -> $ret_type:ty { $asm:expr } ) => (
        extern "C" {
            pub fn $name($($arg: $arg_type),*) -> $ret_type;
        }

        global_asm!(
            concat!(
                "   .global ", $name, "\n",
                $name, ":\n",
                $asm
            )
        );
    );
    (pub unsafe extern "C" fn $name:ident($($arg:ident : $arg_type:ty),*) { $asm: expr } ) => (
        asm_function!(pub unsafe extern "C" $name ($($arg : $arg_type),*)) -> () $asm
    )
}

3.2 Function signatures

Regardless of the method chosen for writing assembly, we will effectively be writing rust function signatures to tell the compiler how to call the assembly. They look like the following

// We make a newtype here for stack pointers to help prevent them being
// misused (particularly useful is that they are given move semantics, so
// users can't make copies and try to switch to the same stack pointer twice).
#[repr(C)]
pub struct StackPtr(usize);

extern "C" {
    // Our start function takes 3 arguments: A pointer to the end of a buffer that 
    // we will use as a stack. A pointer to a function that we will call, which in turn
    //  - Takes two arguments (a pointer to the stack we called it on, and a pointer
    //    to some argument).
    //  - Must never return.
    // Finally we take a pointer to the arguments to pass to the start function.

    // Unlike exit_to this function can return, at least as the calling code can observe, 
    // when a stack uses `exit_to` or `switch` to switch back to us. It returns no value.

    // It's already unsafe to call this function, so it doesn't make it any more unsafe 
    // to call a user supplied function which is also unsafe. Hence we allow for that.
    pub fn start(buf: *mut u8, 
                 to_call: unsafe extern "C" fn(StackPtr, *mut()) -> !, 
                 args: *mut ());

    // The simplest function. We take a stack pointer (that we will switch to), and we 
    // never  return to this stack again (we can't since we forget where the top of it is).
    pub fn exit_to(stack: StackPtr) -> !;

    // Takes a stack pointer to switch to, and a pointer to which we are going to store 
    // our current stack pointer. Like start this function can return indirectly.
    pub fn switch(target: StackPtr, save: *mut StackPtr);
}

4 Wrapping start

Both exit_to and switch are already somewhat convenient to use from rust code (at least for implementing better abstractions like coroutines). start not so much since we have to deal with casting pointers around, manual memory management, etc. We can do better.

It also turns out that actually allocating a stack is not entirely trivial, since we need it to be 16 byte aligned. Or rather we would need the return address at the start of the stack frame, to be 16 byte aligned. So we need it to be offset by 8 bytes from a 16 byte aligned address. We could just throw away some of the end to make this be the case, but rust provides an (unstable) alloc API that lets us request this alignment 4. We get the following code

struct Stack {
    data: *mut u8,
    size: usize
}

impl Stack {
    fn new(size: usize) -> Stack {
        assert_eq!((size + 8) % 16, 0);
        let data = unsafe{ allocate(size, 16 ) };
        if data.is_null() {
            // Like most rust code we don't handle out of memory errors, just crash.
            alloc::oom::oom();
        }
        Stack{ data, size }
    }
}

impl Drop for Stack {
    fn drop(&mut self) {
        unsafe{
            alloc::heap::deallocate(self.data, self.size, 16);
        }
    }
}

We could then make a start function wrapper that takes a Stack instead of a *mut u8, but instead we will be a bit more generic and use a trait in case someone wants to use memory from somewhere else 5. The trait is unsafe because a bad implementation that returns non aligned memory, too small a buffer, or just null/an invalid pointer, can result in a segfault.

pub unsafe trait StackData {
    /// Returns a buffer to use as a stack. The stack must be 16 byte aligned
    /// offset by 8 bytes, i.e. (addr + 8) % 16 == 0. It must be large enough
    /// that the program doesn't overflow. Only addresses before the address
    /// pointed to are read/written, so it is ok if that points one past the end
    /// of the allocation.
    fn start_ptr(&mut self) -> *mut u8;
}

unsafe impl StackData for Stack {
    fn start_ptr(&mut self) -> *mut u8 {
        unsafe{ self.data.offset(self.size as isize) }
    }
}

The rest of the implementation of start (the wrapper version), follows with inline comments.

// We pass this to the C abi following function.
struct StartArgs<F: FnOnce(S, StackPtr) -> !, S: StackData> {
    // A closure, which will run the user supplied code (being a FnOnce closure
    // allows users to move data into the new stack via captures). 
    func: F,
    // We are also going to pass in the buffer that the coroutine is using as a
    // stack to allow for memory management (this ends up being very useful for
    // coroutines).
    stack: S
}

pub fn start<F, S: StackData>(mut stack: S, func: F)
where F: FnOnce(S, StackPtr) -> ! {

    // Notice that this C function is generic over the closure passed in and the type 
    // of buffer being used (i.e. really a set of C functions, one for each combination).
    extern "C" fn f_wrapper<F, S>(parent: StackPtr, args: *mut ()) -> !
    where F: FnOnce(S, StackPtr) -> !, S: StackData {
        // Unpack the arguments passed in via ptr.
        let StartArgs{func, stack} = unsafe{ ptr::read(args as *mut StartArgs<F, S>) };
        // Call  the function
        func(stack, parent)
    }

    // Pack the data, call the function.
    let data = stack.start_ptr();
    let mut args = StartArgs{ func, stack };

    unsafe {
        asm::start(data, f_wrapper::<F, S>, &mut args as *mut StartArgs<F, S> as *mut ());
    }

    // Don't free the stuff we are moving into the new stack via ptr.
    mem::forget(args);
}

5 Testing

Testing on this project was done via cargo's examples instead of regular tests because I wanted something very easy to run a debugger on 6, I didn't want to be on a non main thread (since I believe having one stack be the OS allocated main stack adds complications, that I want to test for), and because using examples made it easy to experiment with the API, as when one stopped compiling it wouldn't break anything else. What follows is (half of) my test for making sure the above code works.

println!("Hello world");
start(&mut stack, |_stack, parent| exit_to(parent));
println!("Whoops, false start");
println!("Ping");
start(&mut stack, |_stack, parent| {
    println!("Pong");
    switch(parent, unsafe{ &mut PING_PONG} );
    println!("Polo");
    exit_to(unsafe{ std::ptr::read(&PING_PONG) });
});
println!("Marco");
unsafe{ switch(std::ptr::read(&PING_PONG), &mut PING_PONG) };

prints

Hello world
Whoops, false start
Ping
Pong
Marco
Polo

6 Generators

We have some useful primitives now, lets make something that is actually useful to end users. First we will be implementing python like "generator" functions, the basic api looks like

let extremely_inefficient_range = Generator::new(|yield_val| {
    for i in 0... 10 {
        yield_val(i);
    }
});

for i in extremely_inefficient_range {
    println!("{}", i);
}

which is approximately equivalent to the following python

def extremely_inefficient_range():
    for i in range(0, 10):
        yield(i)

for i in extremely_inefficient_range():
    print(i)

In pseudocode what we want to do looks like

fn yield_val(arg) {
    store arg somewhere that the main stack can find it.
    switch back to main stack
}

// Runs on new stack
fn start() (
    switch back to main stack
    run user supplied code
)

fn next() {
    If the user supplied code hasn't returned yet: 
         switch to that stack until yield_val is called or it returns. 
         Return the yielded value (or None).

    otherwise return None.
}

We're going to need somewhere to store the yielded values, and the stack pointers, with an address known to both stacks. We'll handle this by allocating space on the heap for the following structure.

// This is created boxed.
struct GeneratorInner<T> {
    // Holds yielded values to move them off the Generator's stack.
    holder: Option<T>,
    // This always holds the StackPtr of the stack not currently being used.
    // It is None when we are on the main stack and the generator has finished
    // executing.
    stack: Option<StackPtr>
}

We can then yield by just setting holder to Some(value_yielded), and switching stacks. The actual code to do this is a bit ugly simply because of the amount of unsafe involved.

&mut |yielded_val| {
    unsafe {
        // gen is a captured pointer to a GeneratorInner struct, here we are
        // setting the held value to the yielded one.
        (*gen).holder = Some(yielded_val);
        // We get the current StackPtr stored in gen, which we will switch to.
        let stack = std::ptr::read((*gen).stack.as_ref().unwrap());
        // We also get a pointer to the location of that StackPtr, which we will overwrite
        // with the StackPtr needed to switch back to us.
        let stack_ptr = (*gen).stack.as_mut().unwrap();
        // Then we do the switch
        primitives::switch(stack, stack_ptr);
    }
}

We can read values from the generator as described in the pseudocode.

if self.inner.stack.is_some() {
    // User supplied code hasn't exited yet. Run it.
    primitives::switch(
    unsafe{ std::ptr::read(self.inner.stack.as_ref().unwrap()) },
    self.inner.stack.as_mut().unwrap());

    // Return the value it yields, if any.
    self.inner.holder.take()
}
else {
    None
}

There's a tiny bit more code needed, but nothing particularly interesting. If you're interested I'd encourage you to just read the code. It's less than 70 lines long. It all comes together to make a beautiful api, here's the test code I ended up with as an example

extern crate coroutines as co;

use co::generator::Generator;

fn range(mut low: usize, high: usize, step: usize) -> Generator<usize> {
    Generator::new(move |yield_val| {
        while low < high {
            yield_val(low);
            low += step;
        }
    })
}

fn main() {
    let mut fib = Generator::new(|yield_val| {
        let mut current = 0;
        let mut next = 1;
        loop {
            yield_val(current);
            next = next + current;
            current = next - current;
        }
    });

    println!("Printing out the fib sequence");
    for x in &mut fib {
        println!("{}", x);
        if x > 500 {
            break;
        }
    }

    println!("Taking a break and printing out a range");
    for x in range(0, 20, 2) {
        println!("{}", x);
    }

    println!("Back to the fib sequence, from where we left off");
    for x in fib {
        if x > 10_000 {
            break;
        }
        println!("{}", x);
    }

    println!("Nested generators just work");
    let outer: Generator<usize> = Generator::new(|yield_val| {
        let mut start: usize = 0;
        loop {
            // For some reason rust can't figure out the type returned by sum (why?)
            yield_val(range(start, start * 10, start).sum::<usize>());
            start += 1;
        }
    });

    for x in outer.take(10) {
        println!("{}", x);
    }
}

7 Coroutines

7.1 Initial implementation

Implementing coroutines from here is just a matter of creating a runtime that manages stacks, and when one blocks switches to another that isn't blocked.

We're going to need (yet another) layer of abstraction around start, here it is

// This is a good point to note that all our global runtime state will be 
// unique to each OS thread, avoiding concurrency issues.

// thread_local! is a macro that makes thread local global variables (each thread 
// gets it own copy), which are lazily initialized on first use.
thread_local! {
    // We're going to need a list of stacks waiting to execute.
    static WAITING_THREADS: RefCell<Vec<StackPtr>> = RefCell::new(vec!());

    // This holds a stack which just figures out what to execute next and switches
    // to it. Which ended up being a very convenient way to implement yielding execution.
    static YIELD_THREAD: UnsafeCell<StackPtr> = UnsafeCell::new(make_yield_thread());
}

/// Takes a closure and runs it on a new coroutine
pub fn start<F: FnOnce() -> () + 'static + panic::UnwindSafe >(f: F) {
    let stack = Stack::new(32 * 1024 - 8);

    primitives::start(stack, move |stack, parent| {
        // Add our parent to the list of threads waiting to execute.
        // The `.with` syntax here is just how variables defined in thread_local! have
        // to be accessed (the argument to the closure is a reference to the contents of
        // the variable).
        WAITING_THREADS.with(|threads| {
            threads.borrow_mut().push(parent)
        });

        // We catch panic's here so the coroutine panicking doesn't kill the entire OS thread.
        let res = panic::catch_unwind(f);

        if let Err(_) = res {
            eprintln!("Coroutine panicked, killing");
        }

        unsafe {
            // get the stack ptr for the yield thread from the thread local 
            let thread = YIELD_THREAD.with(|it| ptr::read(it.get()));
            // and switch to it
            primitives::exit_to(thread);
        }
    });
}

For now YIELD_THREAD will just switch to the next waiting stack, later it will also need to handle polling for an unblocked stack to switch to.

fn make_yield_thread() -> StackPtr {
    // This is a unstable way of making thread locals that don't need to be lazily
    // initialized and accessed via a closure/.with function that was convenient here.
    // We use this to store the result of a `switch` call, so the new stack can return it.
    #[thread_local]
    static mut RET: StackPtr = NULL_STACK_PTR;

    // Stack a new coroutine
    let stack = Stack::new(16 * 1024 - 8);
    primitives::start(stack, move |_stack, parent| {
        // Immediately switches back to it's parent, we start doing work once a thread yields.
        unsafe{ primitives::switch(parent, &mut RET) };

        WAITING_THREADS.with( |threads|
            loop {
                // Whenever we switch to this coroutine switch to the first waiting thread.
                let maybe_thread = threads.borrow_mut().pop();
                match maybe_thread {
                    Some(thread) => {
                        let ptr = YIELD_THREAD.with(|it| it.get());
                        unsafe{ primitives::unsafe_switch(thread, ptr) };
                    },
                    None => {
                        // For now (with no non blocking IO) we know a waiting thread exists.
                        unreachable!();
                    }
                }
            }
        )
    });
    unsafe{ return ptr::read(&RET) };
}

The observant student will notice that we are currently leaking the Stack objects we create, we'll fix that by creating a pool of stacks that we reuse.

thread_local! {
    static INACTIVE_STACKS: RefCell<Vec<Stack>> = RefCell::new(vec!());
}

Replace the Stack::new line in start with

// Get a existing, unused stack if one exists.
let mut maybe_stack = None;
INACTIVE_STACKS.with(|stacks| maybe_stack = stacks.borrow_mut().pop());
// Otherwise allocate a new one.
let stack = maybe_stack.unwrap_or_else(|| Stack::new(32 * 1024 - 8));

And before the last unsafe block in start where we exit, add the stack to the list of inactive stacks.

INACTIVE_STACKS.with(|stacks| {
    stacks.borrow_mut().push(stack);
});

7.2 Integrating mio (async io)

Next we integrate mio (rust's cross platform epoll abstraction) to get asynchronous IO. Most of the work here is just interacting with mio which I'll skim over. None of the principles behind this are tied to mio, it was just a convenient way to make a quick proof of concept. We'll create an await function that looks like

thread_local! {
    // Global MIO objects
    static MIO_POLL: RefCell<mio::Poll> = RefCell::new(mio::Poll::new().unwrap());
    static MIO_EVENTS: RefCell<mio::Events> = RefCell::new(mio::Events::with_capacity(1024));

    // A count of coroutines currently blocked waiting on mio.
    static MIO_ACTIVE: Cell<usize> = Cell::new(0);

    // Information to keep track of what stack we should start when mio tells us something is
    // ready to execute.
    static MIO_TOKEN_MAP: RefCell<Vec<StackPtr>> = RefCell::new(vec!());
    static MIO_FREE_TOKEN: RefCell<Vec<usize>> = RefCell::new(vec!());
}

pub fn await<E: mio::Evented>(handle: &E, interest: mio::Ready) {
    // Handle is something like a TcpStream, interest is something like 
    // Readable (unblock when the stream is ready to be read from)


    MIO_POLL.with(|poll| {
        MIO_TOKEN_MAP.with(|map| {
            // Get a token which mio will return when we are ready to execute again.
            // (and allocate space in the map from tokens to stack pointers if needed).

            let maybe_token = MIO_FREE_TOKEN.with(|free|
                free.borrow_mut().pop()
            );

            let token = maybe_token.unwrap_or_else(|| {
                let mut map = map.borrow_mut();
                map.push(NULL_STACK_PTR);
                map.len() - 1
            });

            {
                // Register the token with mio, handle and interest are passed in by
                // the user (described above). The logic to reregister (needed if the
                // token has been used before) is a bit ugly here but it works.

                let poll = poll.borrow();
                let res = poll.register(handle, mio::Token(token), interest,
                    mio::PollOpt::oneshot());
                if !res.is_ok() {
                    poll.reregister(handle, mio::Token(token), interest,
                        mio::PollOpt::oneshot()).expect("Failed to register");
                }
            }

            // Increment counter.
            MIO_ACTIVE.with(|active| active.set(active.get() + 1));

            // Switch threads, storing the stack pointer to switch back
            // in the map associated with the token we registered with mio.
            unsafe {
                let thread = YIELD_THREAD.with(|it| ptr::read(it.get()));
                let ptr = {
                    // Need scope to force drop of map (?)
                    let mut mbm = map.borrow_mut();
                    let ptr = &mut mbm[token] as *mut StackPtr;
                    ptr
                };
                primitives::unsafe_switch(thread, ptr);
            }
        })
    });
}

We'll have to modify our yield thread to poll mio if there are no waiting threads. The unreachable!() executed when there are no waiting threads is replaced with

MIO_EVENTS.with(|events| {
    // Poll mio for events (blocking until something is ready).
    MIO_POLL.with(|poll|
        poll.borrow().poll(&mut *events.borrow_mut(), None).expect("mio poll failed")
    );

    // For each event, switch to it's thread in turn. Notice that we will stay in this 
    // portion of the code until we have handled every single event. (This is why
    // implementing this as a separate stack was so convenient).
    for event in events.borrow().iter() {
        let thread = get_stack(event);
        MIO_ACTIVE.with(|active| active.set(active.get() - 1));

        let ptr = YIELD_THREAD.with(|it| it.get());
        unsafe{ primitives::unsafe_switch(thread, ptr) };
    }
});

There's one remaining problem, and that is that if code with blocking coroutines returns from main, it will instantly terminate instead of finishing those coroutines. So we make a function that waits for all coroutines to be done before exiting.

pub fn run() {
    let mut finished = false;
    loop {
        // Check if there are any waiting of blocked threads
        WAITING_THREADS.with(|threads|
            MIO_ACTIVE.with(|active|
                finished = active.get() == 0 && threads.borrow().is_empty()
            )
        );

        // If not, return
        if finished { break; }

        // Otherwise go run one of them (waiting for one to unblock if necessary).
        unsafe {
            let thread = YIELD_THREAD.with(|it| ptr::read(it.get()));
            primitives::unsafe_switch(thread, new_waiting());
        }
    }
}

8 Echo server example

I think this mostly stands on its own. The only thing I would note is that if you were serious about using this library you should wrap TcpStream and such in structs that automatically call await instead of calling it manually.

extern crate coop;
extern crate mio;

use coop::coroutine as co;
use mio::tcp::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::io::{Read, Write};

fn echo(mut stream: TcpStream, _addr: SocketAddr) {
    let mut buf = [0u8; 2048];
    let mut used = 0;
    loop {
        if used == 0 {
            co::await(&stream, mio::Ready::readable());
            used = stream.read(&mut buf[used..]).expect("Failed to read from stream");
            if used == 0 {
                break;
            }
        }
        else {
            co::await(&stream, mio::Ready::writable());
            let res = stream.write(&buf[..used]).expect("Failed to write to stream");
            // Manual memmove, don't know why this isn't in std.
            for i in 0.. used - res {
                buf[i] = buf[res + i];
            }
            used -= res;
        }
    }
}

fn main() {
    let addr = "127.0.0.1:13266".parse().unwrap();
    let server = TcpListener::bind(&addr).unwrap();
    loop {
        co::await(&server, mio::Ready::readable());
        match server.accept() {
            Ok((stream, addr)) => {
                co::start(move || echo(stream, addr));
            }
            Err(e) => {
                println!("Error accepting: {:?}", e);
            }
        }
    }
}

9 Remaining work, issues, and questions

  • The linux kernel will apparently free pages past rsp. Does this imply that having rsp point to the heap is going to cause it to allocate as large a stack as possible, or is that done lazily (seems probable)? Does this mean that using a stack-allocated stack for a coroutine will cause undefined behaviour as the kernel unmaps the stack of the main thread past the coroutines stack?
  • Obviously I've written a huge amount of unsafe code here. (And this is entirely reasonable, I'm interacting with assembly, which the compiler cannot reason about. And I'm implementing new forms of control flow, which the compiler can't reason about). I think the high level APIs I've exposed are safe to use, but I'm not entirely sure. The lifetime of the closures are required to be 'static, so any variables referenced in them will live forever. Generator's can't be sent across threads, and there's nothing to send with coroutines so there should be no way to break the expectations of Send and Sync. We catch panics, so they shouldn't cross the ffi boundary into assembly. The most obvious potential issue is that sending something from one stack to another might break expectations about lifetimes, Send, or Sync somehow, but I haven't thought of a way how.
  • If anyone is actually interested in using this code the assembly should be ported to the other platforms. It shouldn't be hard, it just needs to be done.
  • Without segmented stacks we are stuck allocating reasonable sized stacks and hoping we don't overflow them. This increases memory requirements and creates a chance of crashing in mysterious ways due to stack overflows. Unfortunately I'm pretty sure it's not possible to fix this without compiler support.
  • Would this actually be useful for anyone if it was polished into something production ready? How does it compare to existing libraries? I've only given them a cursory glance.

Footnotes:

1
Otherwise simd instructions will segfault
2
Actually on amd64 it's free to do that only for addresses 128 bytes above rsp, allowing small functions that don't call other functions to not increment the stack pointer
3
Which I suppose could be useful if you want a bigger stack than the maximum allowed by rlimit
4
I originally tried using a Vec<i128>, turns out that i128's are only 8 byte aligned despite being 16 bytes long
5
I've also implemented a stack allocated stack, though this may be a bad idea as noted in the questions at the end. More usefully you might want to implement your own allocator with mmap, and try and detect stack overflows with a guard page
6
Which was necessary more than once… I even managed to crash gdb a few times

Created: 2017-06-17 Sat 10:37

Validate