Crate chan [] [src]

This crate provides an implementation of a multi-producer, multi-consumer channel. Channels come in three varieties:

  1. Asynchronous channels. Sends never block. Its buffer is only limited by the available resources on the system.
  2. Synchronous buffered channels. Sends block when the buffer is full. The buffer is depleted by receiving on the channel.
  3. Rendezvous channels (synchronous channels without a buffer). Sends block until a receive has consumed the value sent. When a sender and receiver synchronize, they are said to rendezvous.

Asynchronous channels are created with chan::async(). Synchronous channels are created with chan::sync(k) where k is the buffer size. Rendezvous channels are created with chan::sync(0).

all channels are split into the same two types upon creation: a Sender and a Receiver. Additional senders and receivers can be created with reckless abandon by calling clone.

When all senders are dropped, the channel is closed and no other sends are possible. In a channel with a buffer, receivers continue to consume values until the buffer is empty, at which point, a None value is always returned immediately.

No special semantics are enforced when all receivers are dropped. Asynchronous sends will continue to work. Synchronous sends will block indefinitely when the buffer is full. A send on a rendezvous channel will also block indefinitely. (NOTE: This could be changed!)

All channels satisfy both Send and Sync and can be freely mixed in chan_select!. Said differently, the synchronization semantics of a channel are encoded upon construction, but are otherwise indistinguishable to the type system.

Values sent on channels are subject to the normal restrictions Rust has on values crossing thread boundaries. i.e., Values must implement Send and/or Sync. (An Rc<T> cannot be sent on a channel, but a channel can be sent on a channel!)

Example: rendezvous channel

A simple example demonstrating a rendezvous channel:

use std::thread;

let (send, recv) = chan::sync(0);
thread::spawn(move || send.send(5));
assert_eq!(recv.recv(), Some(5)); // blocks until the previous send occurs

Example: synchronous channel

Similarly, an example demonstrating a synchronous channel:

let (send, recv) = chan::sync(1);
send.send(5); // doesn't block because of the buffer
assert_eq!(recv.recv(), Some(5));

Example: multiple producers and multiple consumers

An example demonstrating multiple consumers and multiple producers:

use std::thread;

let r = {
    let (s, r) = chan::sync(0);
    for letter in vec!['a', 'b', 'c', 'd'] {
        let s = s.clone();
        thread::spawn(move || {
            for _ in 0..10 {
                s.send(letter);
            }
        });
    }
    // This extra lexical scope will drop the initial
    // sender we created. Thus, the channel will be
    // closed when all threads spawned above has completed.
    r
};

// A wait group lets us synchronize the completion of multiple threads.
let wg = chan::WaitGroup::new();
for _ in 0..4 {
    wg.add(1);
    let wg = wg.clone();
    let r = r.clone();
    thread::spawn(move || {
        for letter in r {
            println!("Received letter: {}", letter);
        }
        wg.done();
    });
}

// If this was the end of the process and we didn't call `wg.wait()`, then
// the process might quit before all of the consumers were done.
// `wg.wait()` will block until all `wg.done()` calls have finished.
wg.wait();

Example: Select on multiple channel sends/receives

An example showing how to use chan_select! to synchronize on sends or receives.

#[macro_use]
extern crate chan;

use std::thread;

// Emits the fibonacci sequence on the given channel until `quit` receives
// a sentinel value.
fn fibonacci(s: chan::Sender<u64>, quit: chan::Receiver<()>) {
    let (mut x, mut y) = (0, 1);
    loop {
        // Select will block until at least one of `s.send` or `quit.recv`
        // is ready to succeed. At which point, it will choose exactly one
        // send/receive to synchronize.
        chan_select! {
            s.send(x) => {
                let oldx = x;
                x = y;
                y = oldx + y;
            },
            quit.recv() => {
                println!("quit");
                return;
            }
        }
    }
}

fn main() {
    let (s, r) = chan::sync(0);
    let (qs, qr) = chan::sync(0);
    // Spawn a thread and ask for the first 10 numbers in the fibonacci
    // sequence.
    thread::spawn(move || {
        for _ in 0..10 {
            println!("{}", r.recv().unwrap());
        }
        // Dropping all sending channels causes the receive channel to
        // immediately and always synchronize (because the channel is closed).
        drop(qs);
    });
    fibonacci(s, qr);
}

Example: non-blocking sends/receives

This crate specifically does not expose methods like try_send or try_recv. Instead, you should prefer using chan_select! to perform a non-blocking send or receive. This can be done by telling select what to do when no synchronization events are available.

let (s, _) = chan::sync(0);
chan_select! {
    default => println!("Send failed."),
    s.send("some data") => println!("Send succeeded."),
}

When chan_select! first runs, it will check if s.send(...) can succeed without blocking. If so, chan_select! will permit the channels to rendezvous. However, if there is no recv call to accept the send, then chan_select! will immediately execute the default arm.

Example: the sentinel channel idiom

When writing concurrent programs with chan, you will often find that you need to somehow "wait" until some operation is done. For example, let's say you want to run a function in a separate thread, but wait until it completes. Here's one way to do it:

use std::thread;

fn do_work(done: chan::Sender<()>) {
    // do something

    // signal that we're done.
    done.send(());
}

fn main() {
    let (sdone, rdone) = chan::sync(0);
    thread::spawn(move || do_work(sdone));
    // block until work is done, and then quit the program.
    rdone.recv();
}

In effect, we've created a new channel that sends unit values. When we're done doing work, we send a unit value and main waits for it to be delivered.

Another way of achieving the same thing is to simply close the channel. Once the channel is closed, any previously blocked receive operations become immediately unblocked. What's even cooler is that channels are closed automatically when all senders are dropped. So the new program looks something like this:

use std::thread;

fn do_work(_done: chan::Sender<()>) {
    // do something
}

fn main() {
    let (sdone, rdone) = chan::sync(0);
    thread::spawn(move || do_work(sdone));
    // block until work is done, and then quit the program.
    rdone.recv();
}

We no longer need to explicitly do anything with the done channel. We give do_work ownership of the channel, but as soon as the function stops executing, done is dropped, the channel is closed and rdone.recv() unblocks.

Example: I want more!

There are some examples in this crate's repository: https://github.com/BurntSushi/chan/tree/master/examples

Here is a nice example using the chan-signal crate to read lines from stdin while gracefully quitting after receiving a INT or TERM signal: https://github.com/BurntSushi/chan-signal/blob/master/examples/read_names.rs

A non-trivial program for periodically sending email with the output of running a command: https://github.com/BurntSushi/rust-cmail (The source is commented more heavily than normal.)

When are channel operations non-blocking?

Non-blocking in this context means "a send/recv operation can synchronize immediately." (Under the hood, a mutex may still be acquired, which could block.)

The following is a list of all cases where a channel operation is considered non-blocking:

Non-blocking semantics are important because they affect the behavior of chan_select!. In particular, a chan_select! with a default arm will execute the default case if and only if all other operations are blocked.

Which channel type should I use?

From Ken Kahn:

About 25 years ago I went to dinner with Carl Hewitt and Robin Milner (of CSS and pi calculus fame) and they were arguing about synchronous vs. asynchronous communication primitives. Carl used the post office metaphor while Robin used the telephone. Both quickly admitted that one can implement one in the other.

With three channel types to choose from, it may not always be clear which one you should use. In fact, there has been a long debate over which are better. Here are some rough guidelines:

Warning: leaks

Channels can be leaked! In particular, if all receivers have been dropped, then any future sends will block. Usually this is indicative of a bug in your program.

For example, consider a "generator" style pattern where a thread produces values on a channel and another thread consumes in an iterator.

use std::thread;

let (s, r) = chan::sync(0);

thread::spawn(move || {
    for val in r {
        if val >= 2 {
            break;
        }
    }
});

s.send(1);
s.send(2);
// This will deadlock because the loop in the thread
// above quits after receiving `2`.
s.send(3);

If the iterator loop quits early, the channel's buffer could fill up, which will indefinitely block all future send operations.

(These leaks/deadlocks are detectable in most circumstances, and a send operation could be made to wake up and either return an error or panic. The semantics here are still experimental.)

Warning: more leaks

It will always be possible to leak a channel in safe code regardless of the channel's semantics. For example:

use std::mem::forget;

let (s, r) = chan::sync::<()>(0);
forget(s);
// Blocks forever because the channel is never closed.
r.recv();

In this case, it is impossible for the channel to close because the internal reference count will never reach 0.

Warning: performance

The primary purpose of this crate is to provide a safe, concurrent abstraction. Notably, it is not a zero-cost abstraction. It is not even a near-zero-cost abstraction. Throughput on a channel is startlingly low (see the benchmarks in this crate's repository). Therefore, the channels provided in this crate are most useful as a means to structure concurrent programs at a coarse level.

If your requirements call for performant synchronization of data, chan is not the crate you're looking for.

Prior art

The semantics encoded in the channels provided by this crate should mirror or closely mirror the semantics provided by channels in Go. This includes select statements! The major difference between concurrent programs written with chan and concurrent programs written with Go is that Go programs can benefit from being fast and loose with creating goroutines. In chan, each "goroutine" is just an OS thread.

In terms of writing code:

  1. Go programs will feature explicit closing of channels. In chan, channels are closed only when all senders have been dropped.
  2. Since there is no such thing as a "nil" channel in chan, the semantics Go has for nil channels (both sends and receives block indefinitely) do not exist in chan.
  3. chan does not expose len or cap methods. (For no reason other than to start with a totally minimal API. In particular, calling len or cap on a channel is often The Wrong Thing. But not always. So this restriction will probably be lifted.)
  4. In chan, all channels are either senders or receivers. There is no "bidirectional" channel. This is manifest in how channel memory is managed: channels are closed when all senders are dropped.

Of course, Go is not the origin of these ideas, but it has been the strongest influence on the design of this library, and at least one of its authors has done substantial research on the integration of CSP and programming languages.

Macros

chan_select!

Synchronize on at most one channel send or receive operation.

Structs

Iter

An iterator over values received in a channel.

Receiver

The receiving half of a channel.

Sender

The sending half of a channel.

WaitGroup

WaitGroup provides synchronization on the completion of threads.

Functions

after_ms

Creates a new rendezvous channel that is dropped after a timeout.

async

Create an asynchronous channel with an unbounded buffer.

sync

Create a synchronous channel with a possibly empty buffer.

tick_ms

Creates a new rendezvous channel that is "ticked" every duration.