Sunday, March 16, 2014

Select over multiple Rust Channels


/* ---[ Channels in Rust ]--- */

The channel paradigm for CSP-based concurrency has received a lot of attention lately since it is the foundational concurrency paradigm in Go and Clojure has embraced it with core.async. It turns out that Rust, the new language from Mozilla, also fully embraces channel-based message passing concurrency.

Both Go and Clojure's core.async have a select operation that allows your code to wait on multiple channels and respond to the first one that is ready. This is based, at least conceptually, on the Unix select system call that monitors multiple file descriptors.

Rust also has a select operation. And it has a select! macro to make using it easier. Here's an example:

use std::io::Timer;

fn use_select_macro() {
    let (ch, pt): (Sender<~str>, Receiver<~str>) = channel();

    let mut timer = Timer::new().unwrap();
    let timeout = timer.oneshot(1000);
    select! (
        s = pt.recv() => println!("{:s}", s),
        () = timeout.recv() => println!("timed out!")
    );
}

Channels and Ports are now called Senders and Receivers in Rust. As with select in Go, if the Receiver called pt has a message come in before the 1 second timer goes off, its code block will execute. Otherwise, the timer's Receiver will be read from and its code block executed, printing "timed out".

Note that the select! macro uses parens, like a function call, not curly braces like a code block.

The select! macro is currently labelled experimental, since it has some limitations. One I hit this week is that it will fail (as in, not compile) if you embed the Receiver in a struct:


fn does_not_compile() {
    let (ch, pt): (Sender<~str>, Receiver<~str>) = channel();
    let a = A{c: ch, p: pt};

    let mut timer = Timer::new().unwrap();
    let timeout = timer.oneshot(1000);
    select! (
        s = a.p.recv() => println!("{:s}", s), 
        () = timeout.recv() => println!("time out!")
    );
}

This fails with error: no rules expected the token '.'. I've filed an issue for it here: https://github.com/mozilla/rust/issues/12902#issuecomment-37714663


/* ---[ Using the Rust Channel Select API ]--- */

The workaround is to use the Select API directly. Here's how you do it:


use std::comm::Select;
use std::io::Timer;

fn select_from_struct() {
    let (ch, pt): (Sender<~str>, Receiver<~str>) = channel();    
    let mut timer = Timer::new().unwrap();
    let timeout = timer.oneshot(1000);

    let a = A{c: ch, p: pt};

    let sel = Select::new();
    let mut pt = sel.handle(&a.p);
    let mut timeout = sel.handle(&timeout);
    unsafe { pt.add(); timeout.add(); }
    let ret = sel.wait();

    if ret == pt.id() {
        let s = pt.recv();
        println!("ss: {:?}", s);
    } else if ret == timeout.id() {
        let () = timeout.recv();
        println!("TIMEOUT!!");
    }
}

It's a little more code, but fairly straightforward to follow. You wrap your Receivers in a select Handle and them add them add them to the Receiver set via the add method (which must be wrapped in an unsafe block). Each handle gets an id so you can discover which one returned first.

Finally you wait. When the wait returns you check the return id and execute the appropriate block of code, which starts by call recv on the Receiver to get the incoming value (if any).

2 comments: