Implementing cooperative multitasking in Rust
Table of Contents
I've implemented coroutines and python like generators in Rust (with a bit of assembly). This code was done as a learning exercise more than as something intended for production. There are already several libraries in Rust that do approximately the same thing. I believe they all call out to the hand written assembly in Boost's context library instead of writing it themselves.
- The code is available here.
- Rudimentary knowledge of what a call stack is, is assumed.
- I am writing for the AMD 64 architecture, System V ABI (Which means Intel/AMD 64 bit CPUs running pretty much anything but windows). There's no reason that I couldn't write this for another architecture/ABI and it would be very easy to port, I just havent.
- Comments in code are meant to be read, all the comments were written for this blog post not copied from the code base.
- I use some assembly in this post. I'm using the intel syntax instead of the AT&T syntax since I think it's more readable for those not familiar with assembly. In the code base I had to use AT&T syntax for technical reasons (discussed below). - If you're not familiar with assembly, the following will hopefully be sufficient to read this post. You have registers such as - rax,- rsp,- r12, … which hold 64 bit values. The syntax- [rax]means the value of the memory pointed to by- rax.- [rax + 8]means the value of the memory pointed to by- rax + 8. We use the following instructions- mov x, y ; Sets x to the value of y sub x, y ; Sets x to x - y jmp value ; Changes the instruction pointer to value, ; we will now start executing from there ret ; Jumps to [rsp] and increments rsp by 8 
1 The (amd64 System V) ABI
The ABI specifies how functions are called at an assembly level. The most
important part of this is specifying how the call stack works. Each function
called gets a stack frame starting where rsp (the stack pointer register)
points when it is called, which is guaranteed to be 16 byte aligned 1. It can allocate space for local
variables by decrementing rsp (the stack grows towards lower addresses), and
then writing to where it points to. The operating system is free to do with
whatever memory it wants above (that is, at lower valued addresses) than rsp,
including writing to them or unmapping the memory so access results in segfaults
2.
 
When calling a function we push the address of the instruction to return to onto
the stack, store arguments in the right places (for our purposes it suffices to
know that the first 3 integer/pointer arguments get put in registers rdi,
rsi, and rdx). And then jump to the first instruction in the function
called. When returning from a function we pop off the return address from the
stack (leaving rsp pointing right before it), and jump to the address it
contains. Called functions must ensure that rbx, rbp, r12, r13, r14,
r15, the MXCSR control register, and the x87 control word have the same value
when they return as when they were called (these are known as callee save
registers), and are free to change the value of any other register.
2 Using multiple stacks
To implement coroutines and generates we are going to use multiple call stacks.
If you consider the ABI, there is nothing restricting you to using rsp exactly
as intended (incrementing/decrementing as a pointer to the top of a single
stack), it just always needs to point to some memory with sufficient allocated
space "above" it (at lower addresses) so that it doesn't overflow into anything
bad. In particular it's possible to "switch" stacks by changing rsp from
pointing to the top of one stack to the top of another.
In it's simplest form this could look like
mov rsp, <new stack> ; set the stack pointer to point at the new stack jmp <some function> ; and start executing some function
At this point some function will execute just fine on a new stack, as long as
it doesn't return. As it turns out it's easy to specify that it should never
return in rust, we just give it a return type of !. We also want some way of
actually calling this code (as well as a consistent environment for the next
step), so we are going to wrap it in a function. Passing in the address of the
bottom of the stack as the first parameter (rdi), and the address of the
function to call as the second parameter (rsi).
mov rsp, rdi jmp rsi
To be actually useful we are going to want some way of continuing on the original stack, otherwise this just turns into a fancy way to get a bigger stack 3. To do this we will store the callee save registers onto the stack, and pass the pointer to the current stack to the called function.
sub rsp, 64 ; Allocate space on the stack for the callee save registers mov [rsp], rbx ; copy all the callee save registers into it mov [rsp + 8], rbp mov [rsp + 16], r12 mov [rsp + 24], r13 mov [rsp + 32], r14 mov [rsp + 40], r15 stmxcsr [rsp + 48] ; We need to use special instructions for the control word registers fnstcw [rsp + 52] ; but these are both effectively just mov instructions. mov rax, rsp ; swap the address of the current stack (rsp) and new stack (rdi, the first argument) mov rsp, rdi mov rdi, rax jmp rsi ; jump to the new function
This is now almost the start function I actually want, the only addition is it
turns out to be very useful to forward an argument to the function (a pointer to
some data that we can use to pass arguments and move objects into the new stack).
sub rsp, 64 mov [rsp], rbx mov [rsp + 8], rbp mov [rsp + 16], r12 mov [rsp + 24], r13 mov [rsp + 32], r14 mov [rsp + 40], r15 stmxcsr [rsp + 48] fnstcw [rsp + 52] mov rax, rsp mov rsp, rdi mov rdi, rax mov rax, rsi ; save the address we want to jump to mov rsi, rdx ; move the pointer we are forwarding from rdx (arg 3) to rsi (arg 2). jmp rax ; jump to the new function, which we saved in rax
To switch back from the new stack we can switch rsp back to the stack pointer
we were passed, restore the callee save registers, and return. I'm going to wrap
this code in a function, which expects that pointer as an argument (in rdi).
Note that the return statement will pop the return pointer from the stack we
just switched to and start executing there, not the stack where this "return"
function (which I have called exit_to) is called from.
mov rsp, rdi ; Switch stack pointer back mov rbx, [rsp] ; copy all the callee save registers back from the stack mov rbp, [rsp + 8] mov r12, [rsp + 16] mov r13, [rsp + 24] mov r14, [rsp + 32] mov r15, [rsp + 40] ldmxcsr [rsp + 48] ; Again we need to use special instructions for the control word fldcw [rsp + 52] ; registers but these are both effectively just mov instructions. add rsp, 64 ; free the space we allocated on the stack ret ; pop and jump to the return address
Like our initial implementation of switching to a new stack, this gives no way
to switch back, though in this case it still ends up being a useful function. We
can combine the two functions to make a switch function. The only question is
what to do with the original stack pointer, since we aren't calling a function
we can pass it to, and returning it ends up being fairly inconvenient. Instead
we'll take a second argument as the address at which we should store the stack
pointer (the first argument being the stack pointer to switch to).
; Save callee save registers as in start sub rsp, 64 mov [rsp], rbx mov [rsp + 8], rbp mov [rsp + 16], r12 mov [rsp + 24], r13 mov [rsp + 32], r14 mov [rsp + 40], r15 stmxcsr [rsp + 48] fnstcw [rsp + 52] mov [rsi], rsp ; Save original stack pointer mov rsp, rdi ; Switch into new stack ; Restore callee save registers and return as in exit_to mov rbx, [rsp] mov rbp, [rsp + 8] mov r12, [rsp + 16] mov r13, [rsp + 24] mov r14, [rsp + 32] mov r15, [rsp + 40] ldmxcsr [rsp + 48] fldcw [rsp + 52] add rsp, 64 ret
3 Wrapping primitives in Rust
3.1 Writing assembly in rust
This section (3.1) is skippable. It is intended primarily as feedback to help drive rust development, and not targeted to as general an audience. The current solutions are unstable, buggy, and likely to change in the near future.
We've written some cool assembly above, but now we need a way to call it from
our actual code. There are three ways to call assembly in rust. Compile it
separately and link it like any other object file (like if one were linking C
code), use naked functions and the asm! macro, or use the global_asm!
macro.
I'd rather avoid the first if possible, since it means requiring a separate compiler, and having the assembly source far removed from the rust headers.
I initially went with the second, but rust issue 34043 means that it will result
in undefined behaviour. I also encountered an issue where the compiler would
inline my assembly, and rely on me not clobbering registers I didn't tell it I
would clobber. So I would have to tell the compiler I clobber every single caller
save register to avoid undefined behaviour (as does any assembly that calls a
user supplied function!?). Which apart from being a pain, doesn't seem to be
possible if I want to support future CPUs with yet more extensions. (The
specific clobber code gen issue could be worked around with #[inline(never)],
but I doubt that's a robust solution).
So I've ended up using the third method. This has three minor drawbacks. It only
supports AT&T assembly syntax (the technical issue previously mentioned), names
aren't mangled so we have to worry about name collisions, and I have to repeat
the function name 3 times when defining it. I tried to use a macro (code below)
to avoid the third drawback, but it turns out that trying to pass the output of
concat! to global_asm! will result in an error "expected a literal" so (at
least without procedural macros) that isn't possible.
// Warning, untested and doesn't work macro_rules! asm_function { (pub unsafe extern "C" fn $name:ident($($arg:ident : $arg_type:ty),*) -> $ret_type:ty { $asm:expr } ) => ( extern "C" { pub fn $name($($arg: $arg_type),*) -> $ret_type; } global_asm!( concat!( " .global ", $name, "\n", $name, ":\n", $asm ) ); ); (pub unsafe extern "C" fn $name:ident($($arg:ident : $arg_type:ty),*) { $asm: expr } ) => ( asm_function!(pub unsafe extern "C" $name ($($arg : $arg_type),*)) -> () $asm ) }
3.2 Function signatures
Regardless of the method chosen for writing assembly, we will effectively be writing rust function signatures to tell the compiler how to call the assembly. They look like the following
// We make a newtype here for stack pointers to help prevent them being // misused (particularly useful is that they are given move semantics, so // users can't make copies and try to switch to the same stack pointer twice). #[repr(C)] pub struct StackPtr(usize); extern "C" { // Our start function takes 3 arguments: A pointer to the end of a buffer that // we will use as a stack. A pointer to a function that we will call, which in turn // - Takes two arguments (a pointer to the stack we called it on, and a pointer // to some argument). // - Must never return. // Finally we take a pointer to the arguments to pass to the start function. // Unlike exit_to this function can return, at least as the calling code can observe, // when a stack uses `exit_to` or `switch` to switch back to us. It returns no value. // It's already unsafe to call this function, so it doesn't make it any more unsafe // to call a user supplied function which is also unsafe. Hence we allow for that. pub fn start(buf: *mut u8, to_call: unsafe extern "C" fn(StackPtr, *mut()) -> !, args: *mut ()); // The simplest function. We take a stack pointer (that we will switch to), and we // never return to this stack again (we can't since we forget where the top of it is). pub fn exit_to(stack: StackPtr) -> !; // Takes a stack pointer to switch to, and a pointer to which we are going to store // our current stack pointer. Like start this function can return indirectly. pub fn switch(target: StackPtr, save: *mut StackPtr); }
4 Wrapping start
Both exit_to and switch are already somewhat convenient to use from rust
code (at least for implementing better abstractions like coroutines). start
not so much since we have to deal with casting pointers around, manual memory
management, etc. We can do better.
It also turns out that actually allocating a stack is not entirely trivial, since we need it to be 16 byte aligned. Or rather we would need the return address at the start of the stack frame, to be 16 byte aligned. So we need it to be offset by 8 bytes from a 16 byte aligned address. We could just throw away some of the end to make this be the case, but rust provides an (unstable) alloc API that lets us request this alignment 4. We get the following code
struct Stack { data: *mut u8, size: usize } impl Stack { fn new(size: usize) -> Stack { assert_eq!((size + 8) % 16, 0); let data = unsafe{ allocate(size, 16 ) }; if data.is_null() { // Like most rust code we don't handle out of memory errors, just crash. alloc::oom::oom(); } Stack{ data, size } } } impl Drop for Stack { fn drop(&mut self) { unsafe{ alloc::heap::deallocate(self.data, self.size, 16); } } }
We could then make a start function wrapper that takes a Stack instead of a
*mut u8, but instead we will be a bit more generic and use a trait in case
someone wants to use memory from somewhere else 5. The trait is unsafe
because a bad implementation that returns non aligned memory, too small a buffer,
or just null/an invalid pointer, can result in a segfault.
pub unsafe trait StackData { /// Returns a buffer to use as a stack. The stack must be 16 byte aligned /// offset by 8 bytes, i.e. (addr + 8) % 16 == 0. It must be large enough /// that the program doesn't overflow. Only addresses before the address /// pointed to are read/written, so it is ok if that points one past the end /// of the allocation. fn start_ptr(&mut self) -> *mut u8; } unsafe impl StackData for Stack { fn start_ptr(&mut self) -> *mut u8 { unsafe{ self.data.offset(self.size as isize) } } }
The rest of the implementation of start (the wrapper version), follows with
inline comments.
// We pass this to the C abi following function. struct StartArgs<F: FnOnce(S, StackPtr) -> !, S: StackData> { // A closure, which will run the user supplied code (being a FnOnce closure // allows users to move data into the new stack via captures). func: F, // We are also going to pass in the buffer that the coroutine is using as a // stack to allow for memory management (this ends up being very useful for // coroutines). stack: S } pub fn start<F, S: StackData>(mut stack: S, func: F) where F: FnOnce(S, StackPtr) -> ! { // Notice that this C function is generic over the closure passed in and the type // of buffer being used (i.e. really a set of C functions, one for each combination). extern "C" fn f_wrapper<F, S>(parent: StackPtr, args: *mut ()) -> ! where F: FnOnce(S, StackPtr) -> !, S: StackData { // Unpack the arguments passed in via ptr. let StartArgs{func, stack} = unsafe{ ptr::read(args as *mut StartArgs<F, S>) }; // Call the function func(stack, parent) } // Pack the data, call the function. let data = stack.start_ptr(); let mut args = StartArgs{ func, stack }; unsafe { asm::start(data, f_wrapper::<F, S>, &mut args as *mut StartArgs<F, S> as *mut ()); } // Don't free the stuff we are moving into the new stack via ptr. mem::forget(args); }
5 Testing
Testing on this project was done via cargo's examples instead of regular tests because I wanted something very easy to run a debugger on 6, I didn't want to be on a non main thread (since I believe having one stack be the OS allocated main stack adds complications, that I want to test for), and because using examples made it easy to experiment with the API, as when one stopped compiling it wouldn't break anything else. What follows is (half of) my test for making sure the above code works.
println!("Hello world"); start(&mut stack, |_stack, parent| exit_to(parent)); println!("Whoops, false start"); println!("Ping"); start(&mut stack, |_stack, parent| { println!("Pong"); switch(parent, unsafe{ &mut PING_PONG} ); println!("Polo"); exit_to(unsafe{ std::ptr::read(&PING_PONG) }); }); println!("Marco"); unsafe{ switch(std::ptr::read(&PING_PONG), &mut PING_PONG) };
prints
Hello world Whoops, false start Ping Pong Marco Polo
6 Generators
We have some useful primitives now, lets make something that is actually useful to end users. First we will be implementing python like "generator" functions, the basic api looks like
let extremely_inefficient_range = Generator::new(|yield_val| { for i in 0... 10 { yield_val(i); } }); for i in extremely_inefficient_range { println!("{}", i); }
which is approximately equivalent to the following python
def extremely_inefficient_range(): for i in range(0, 10): yield(i) for i in extremely_inefficient_range(): print(i)
In pseudocode what we want to do looks like
fn yield_val(arg) {
    store arg somewhere that the main stack can find it.
    switch back to main stack
}
// Runs on new stack
fn start() (
    switch back to main stack
    run user supplied code
)
fn next() {
    If the user supplied code hasn't returned yet: 
         switch to that stack until yield_val is called or it returns. 
         Return the yielded value (or None).
    otherwise return None.
}
We're going to need somewhere to store the yielded values, and the stack pointers, with an address known to both stacks. We'll handle this by allocating space on the heap for the following structure.
// This is created boxed. struct GeneratorInner<T> { // Holds yielded values to move them off the Generator's stack. holder: Option<T>, // This always holds the StackPtr of the stack not currently being used. // It is None when we are on the main stack and the generator has finished // executing. stack: Option<StackPtr> }
We can then yield by just setting holder to Some(value_yielded), and
switching stacks. The actual code to do this is a bit ugly simply because of the
amount of unsafe involved.
&mut |yielded_val| { unsafe { // gen is a captured pointer to a GeneratorInner struct, here we are // setting the held value to the yielded one. (*gen).holder = Some(yielded_val); // We get the current StackPtr stored in gen, which we will switch to. let stack = std::ptr::read((*gen).stack.as_ref().unwrap()); // We also get a pointer to the location of that StackPtr, which we will overwrite // with the StackPtr needed to switch back to us. let stack_ptr = (*gen).stack.as_mut().unwrap(); // Then we do the switch primitives::switch(stack, stack_ptr); } }
We can read values from the generator as described in the pseudocode.
if self.inner.stack.is_some() { // User supplied code hasn't exited yet. Run it. primitives::switch( unsafe{ std::ptr::read(self.inner.stack.as_ref().unwrap()) }, self.inner.stack.as_mut().unwrap()); // Return the value it yields, if any. self.inner.holder.take() } else { None }
There's a tiny bit more code needed, but nothing particularly interesting. If you're interested I'd encourage you to just read the code. It's less than 70 lines long. It all comes together to make a beautiful api, here's the test code I ended up with as an example
extern crate coroutines as co; use co::generator::Generator; fn range(mut low: usize, high: usize, step: usize) -> Generator<usize> { Generator::new(move |yield_val| { while low < high { yield_val(low); low += step; } }) } fn main() { let mut fib = Generator::new(|yield_val| { let mut current = 0; let mut next = 1; loop { yield_val(current); next = next + current; current = next - current; } }); println!("Printing out the fib sequence"); for x in &mut fib { println!("{}", x); if x > 500 { break; } } println!("Taking a break and printing out a range"); for x in range(0, 20, 2) { println!("{}", x); } println!("Back to the fib sequence, from where we left off"); for x in fib { if x > 10_000 { break; } println!("{}", x); } println!("Nested generators just work"); let outer: Generator<usize> = Generator::new(|yield_val| { let mut start: usize = 0; loop { // For some reason rust can't figure out the type returned by sum (why?) yield_val(range(start, start * 10, start).sum::<usize>()); start += 1; } }); for x in outer.take(10) { println!("{}", x); } }
7 Coroutines
7.1 Initial implementation
Implementing coroutines from here is just a matter of creating a runtime that manages stacks, and when one blocks switches to another that isn't blocked.
We're going to need (yet another) layer of abstraction around start, here it is
// This is a good point to note that all our global runtime state will be // unique to each OS thread, avoiding concurrency issues. // thread_local! is a macro that makes thread local global variables (each thread // gets it own copy), which are lazily initialized on first use. thread_local! { // We're going to need a list of stacks waiting to execute. static WAITING_THREADS: RefCell<Vec<StackPtr>> = RefCell::new(vec!()); // This holds a stack which just figures out what to execute next and switches // to it. Which ended up being a very convenient way to implement yielding execution. static YIELD_THREAD: UnsafeCell<StackPtr> = UnsafeCell::new(make_yield_thread()); } /// Takes a closure and runs it on a new coroutine pub fn start<F: FnOnce() -> () + 'static + panic::UnwindSafe >(f: F) { let stack = Stack::new(32 * 1024 - 8); primitives::start(stack, move |stack, parent| { // Add our parent to the list of threads waiting to execute. // The `.with` syntax here is just how variables defined in thread_local! have // to be accessed (the argument to the closure is a reference to the contents of // the variable). WAITING_THREADS.with(|threads| { threads.borrow_mut().push(parent) }); // We catch panic's here so the coroutine panicking doesn't kill the entire OS thread. let res = panic::catch_unwind(f); if let Err(_) = res { eprintln!("Coroutine panicked, killing"); } unsafe { // get the stack ptr for the yield thread from the thread local let thread = YIELD_THREAD.with(|it| ptr::read(it.get())); // and switch to it primitives::exit_to(thread); } }); }
For now YIELD_THREAD will just switch to the next waiting stack, later it will
also need to handle polling for an unblocked stack to switch to.
fn make_yield_thread() -> StackPtr { // This is a unstable way of making thread locals that don't need to be lazily // initialized and accessed via a closure/.with function that was convenient here. // We use this to store the result of a `switch` call, so the new stack can return it. #[thread_local] static mut RET: StackPtr = NULL_STACK_PTR; // Stack a new coroutine let stack = Stack::new(16 * 1024 - 8); primitives::start(stack, move |_stack, parent| { // Immediately switches back to it's parent, we start doing work once a thread yields. unsafe{ primitives::switch(parent, &mut RET) }; WAITING_THREADS.with( |threads| loop { // Whenever we switch to this coroutine switch to the first waiting thread. let maybe_thread = threads.borrow_mut().pop(); match maybe_thread { Some(thread) => { let ptr = YIELD_THREAD.with(|it| it.get()); unsafe{ primitives::unsafe_switch(thread, ptr) }; }, None => { // For now (with no non blocking IO) we know a waiting thread exists. unreachable!(); } } } ) }); unsafe{ return ptr::read(&RET) }; }
The observant student will notice that we are currently leaking the Stack objects we create, we'll fix that by creating a pool of stacks that we reuse.
thread_local! { static INACTIVE_STACKS: RefCell<Vec<Stack>> = RefCell::new(vec!()); }
Replace the Stack::new line in start with
// Get a existing, unused stack if one exists. let mut maybe_stack = None; INACTIVE_STACKS.with(|stacks| maybe_stack = stacks.borrow_mut().pop()); // Otherwise allocate a new one. let stack = maybe_stack.unwrap_or_else(|| Stack::new(32 * 1024 - 8));
And before the last unsafe block in start where we exit, add the stack to the
list of inactive stacks.
INACTIVE_STACKS.with(|stacks| { stacks.borrow_mut().push(stack); });
7.2 Integrating mio (async io)
Next we integrate mio (rust's cross platform epoll abstraction) to get
asynchronous IO. Most of the work here is just interacting with mio which I'll
skim over. None of the principles behind this are tied to mio, it was just a
convenient way to make a quick proof of concept. We'll create an await function
that looks like
thread_local! { // Global MIO objects static MIO_POLL: RefCell<mio::Poll> = RefCell::new(mio::Poll::new().unwrap()); static MIO_EVENTS: RefCell<mio::Events> = RefCell::new(mio::Events::with_capacity(1024)); // A count of coroutines currently blocked waiting on mio. static MIO_ACTIVE: Cell<usize> = Cell::new(0); // Information to keep track of what stack we should start when mio tells us something is // ready to execute. static MIO_TOKEN_MAP: RefCell<Vec<StackPtr>> = RefCell::new(vec!()); static MIO_FREE_TOKEN: RefCell<Vec<usize>> = RefCell::new(vec!()); } pub fn await<E: mio::Evented>(handle: &E, interest: mio::Ready) { // Handle is something like a TcpStream, interest is something like // Readable (unblock when the stream is ready to be read from) MIO_POLL.with(|poll| { MIO_TOKEN_MAP.with(|map| { // Get a token which mio will return when we are ready to execute again. // (and allocate space in the map from tokens to stack pointers if needed). let maybe_token = MIO_FREE_TOKEN.with(|free| free.borrow_mut().pop() ); let token = maybe_token.unwrap_or_else(|| { let mut map = map.borrow_mut(); map.push(NULL_STACK_PTR); map.len() - 1 }); { // Register the token with mio, handle and interest are passed in by // the user (described above). The logic to reregister (needed if the // token has been used before) is a bit ugly here but it works. let poll = poll.borrow(); let res = poll.register(handle, mio::Token(token), interest, mio::PollOpt::oneshot()); if !res.is_ok() { poll.reregister(handle, mio::Token(token), interest, mio::PollOpt::oneshot()).expect("Failed to register"); } } // Increment counter. MIO_ACTIVE.with(|active| active.set(active.get() + 1)); // Switch threads, storing the stack pointer to switch back // in the map associated with the token we registered with mio. unsafe { let thread = YIELD_THREAD.with(|it| ptr::read(it.get())); let ptr = { // Need scope to force drop of map (?) let mut mbm = map.borrow_mut(); let ptr = &mut mbm[token] as *mut StackPtr; ptr }; primitives::unsafe_switch(thread, ptr); } }) }); }
We'll have to modify our yield thread to poll mio if there are no waiting
threads. The unreachable!() executed when there are no waiting threads is replaced with
MIO_EVENTS.with(|events| { // Poll mio for events (blocking until something is ready). MIO_POLL.with(|poll| poll.borrow().poll(&mut *events.borrow_mut(), None).expect("mio poll failed") ); // For each event, switch to it's thread in turn. Notice that we will stay in this // portion of the code until we have handled every single event. (This is why // implementing this as a separate stack was so convenient). for event in events.borrow().iter() { let thread = get_stack(event); MIO_ACTIVE.with(|active| active.set(active.get() - 1)); let ptr = YIELD_THREAD.with(|it| it.get()); unsafe{ primitives::unsafe_switch(thread, ptr) }; } });
There's one remaining problem, and that is that if code with blocking coroutines
returns from main, it will instantly terminate instead of finishing those
coroutines. So we make a function that waits for all coroutines to be done
before exiting.
pub fn run() { let mut finished = false; loop { // Check if there are any waiting of blocked threads WAITING_THREADS.with(|threads| MIO_ACTIVE.with(|active| finished = active.get() == 0 && threads.borrow().is_empty() ) ); // If not, return if finished { break; } // Otherwise go run one of them (waiting for one to unblock if necessary). unsafe { let thread = YIELD_THREAD.with(|it| ptr::read(it.get())); primitives::unsafe_switch(thread, new_waiting()); } } }
8 Echo server example
I think this mostly stands on its own. The only thing I would note is that if
you were serious about using this library you should wrap TcpStream and such
in structs that automatically call await instead of calling it manually.
extern crate coop; extern crate mio; use coop::coroutine as co; use mio::tcp::{TcpListener, TcpStream}; use std::net::SocketAddr; use std::io::{Read, Write}; fn echo(mut stream: TcpStream, _addr: SocketAddr) { let mut buf = [0u8; 2048]; let mut used = 0; loop { if used == 0 { co::await(&stream, mio::Ready::readable()); used = stream.read(&mut buf[used..]).expect("Failed to read from stream"); if used == 0 { break; } } else { co::await(&stream, mio::Ready::writable()); let res = stream.write(&buf[..used]).expect("Failed to write to stream"); // Manual memmove, don't know why this isn't in std. for i in 0.. used - res { buf[i] = buf[res + i]; } used -= res; } } } fn main() { let addr = "127.0.0.1:13266".parse().unwrap(); let server = TcpListener::bind(&addr).unwrap(); loop { co::await(&server, mio::Ready::readable()); match server.accept() { Ok((stream, addr)) => { co::start(move || echo(stream, addr)); } Err(e) => { println!("Error accepting: {:?}", e); } } } }
9 Remaining work, issues, and questions
- The linux kernel will apparently free pages past rsp. Does this imply that havingrsppoint to the heap is going to cause it to allocate as large a stack as possible, or is that done lazily (seems probable)? Does this mean that using a stack-allocated stack for a coroutine will cause undefined behaviour as the kernel unmaps the stack of the main thread past the coroutines stack?
- Obviously I've written a huge amount of unsafe code here. (And this is
entirely reasonable, I'm interacting with assembly, which the compiler cannot
reason about. And I'm implementing new forms of control flow, which the
compiler can't reason about). I think the high level APIs I've exposed are
safe to use, but I'm not entirely sure. The lifetime of the closures are
required to be 'static, so any variables referenced in them will live forever. Generator's can't be sent across threads, and there's nothing to send with coroutines so there should be no way to break the expectations ofSendandSync. We catch panics, so they shouldn't cross the ffi boundary into assembly. The most obvious potential issue is that sending something from one stack to another might break expectations about lifetimes,Send, orSyncsomehow, but I haven't thought of a way how.
- If anyone is actually interested in using this code the assembly should be ported to the other platforms. It shouldn't be hard, it just needs to be done.
- Without segmented stacks we are stuck allocating reasonable sized stacks and hoping we don't overflow them. This increases memory requirements and creates a chance of crashing in mysterious ways due to stack overflows. Unfortunately I'm pretty sure it's not possible to fix this without compiler support.
- Would this actually be useful for anyone if it was polished into something production ready? How does it compare to existing libraries? I've only given them a cursory glance.
Footnotes:
rsp, allowing small functions that don't call other functions to not increment
the stack pointer