Crate chan [−] [src]
This crate provides an implementation of a multi-producer, multi-consumer channel. Channels come in three varieties:
- Asynchronous channels. Sends never block. Its buffer is only limited by the available resources on the system.
- Synchronous buffered channels. Sends block when the buffer is full. The buffer is depleted by receiving on the channel.
- Rendezvous channels (synchronous channels without a buffer). Sends block until a receive has consumed the value sent. When a sender and receiver synchronize, they are said to rendezvous.
Asynchronous channels are created with chan::async()
. Synchronous channels
are created with chan::sync(k)
where k
is the buffer size. Rendezvous
channels are created with chan::sync(0)
.
all channels are split into the same two types upon creation: a Sender
and
a Receiver
. Additional senders and receivers can be created with reckless
abandon by calling clone
.
When all senders are dropped, the channel is closed and no other sends are
possible. In a channel with a buffer, receivers continue to consume values
until the buffer is empty, at which point, a None
value is always returned
immediately.
No special semantics are enforced when all receivers are dropped. Asynchronous sends will continue to work. Synchronous sends will block indefinitely when the buffer is full. A send on a rendezvous channel will also block indefinitely. (NOTE: This could be changed!)
All channels satisfy both Send
and Sync
and can be freely mixed in
chan_select!
. Said differently, the synchronization semantics of a channel
are encoded upon construction, but are otherwise indistinguishable to the
type system.
Values sent on channels are subject to the normal restrictions Rust has on
values crossing thread boundaries. i.e., Values must implement Send
and/or
Sync
. (An Rc<T>
cannot be sent on a channel, but a channel can be sent
on a channel!)
Example: rendezvous channel
A simple example demonstrating a rendezvous channel:
use std::thread; let (send, recv) = chan::sync(0); thread::spawn(move || send.send(5)); assert_eq!(recv.recv(), Some(5)); // blocks until the previous send occurs
Example: synchronous channel
Similarly, an example demonstrating a synchronous channel:
let (send, recv) = chan::sync(1); send.send(5); // doesn't block because of the buffer assert_eq!(recv.recv(), Some(5));
Example: multiple producers and multiple consumers
An example demonstrating multiple consumers and multiple producers:
use std::thread; let r = { let (s, r) = chan::sync(0); for letter in vec!['a', 'b', 'c', 'd'] { let s = s.clone(); thread::spawn(move || { for _ in 0..10 { s.send(letter); } }); } // This extra lexical scope will drop the initial // sender we created. Thus, the channel will be // closed when all threads spawned above has completed. r }; // A wait group lets us synchronize the completion of multiple threads. let wg = chan::WaitGroup::new(); for _ in 0..4 { wg.add(1); let wg = wg.clone(); let r = r.clone(); thread::spawn(move || { for letter in r { println!("Received letter: {}", letter); } wg.done(); }); } // If this was the end of the process and we didn't call `wg.wait()`, then // the process might quit before all of the consumers were done. // `wg.wait()` will block until all `wg.done()` calls have finished. wg.wait();
Example: Select on multiple channel sends/receives
An example showing how to use chan_select!
to synchronize on sends
or receives.
#[macro_use] extern crate chan; use std::thread; // Emits the fibonacci sequence on the given channel until `quit` receives // a sentinel value. fn fibonacci(s: chan::Sender<u64>, quit: chan::Receiver<()>) { let (mut x, mut y) = (0, 1); loop { // Select will block until at least one of `s.send` or `quit.recv` // is ready to succeed. At which point, it will choose exactly one // send/receive to synchronize. chan_select! { s.send(x) => { let oldx = x; x = y; y = oldx + y; }, quit.recv() => { println!("quit"); return; } } } } fn main() { let (s, r) = chan::sync(0); let (qs, qr) = chan::sync(0); // Spawn a thread and ask for the first 10 numbers in the fibonacci // sequence. thread::spawn(move || { for _ in 0..10 { println!("{}", r.recv().unwrap()); } // Dropping all sending channels causes the receive channel to // immediately and always synchronize (because the channel is closed). drop(qs); }); fibonacci(s, qr); }
Example: non-blocking sends/receives
This crate specifically does not expose methods like try_send
or try_recv
.
Instead, you should prefer using chan_select!
to perform a non-blocking
send or receive. This can be done by telling select what to do when no
synchronization events are available.
let (s, _) = chan::sync(0); chan_select! { default => println!("Send failed."), s.send("some data") => println!("Send succeeded."), }
When chan_select!
first runs, it will check if s.send(...)
can succeed
without blocking. If so, chan_select!
will permit the channels to
rendezvous. However, if there is no recv
call to accept the send, then
chan_select!
will immediately execute the default
arm.
Example: the sentinel channel idiom
When writing concurrent programs with chan
, you will often find that you need
to somehow "wait" until some operation is done. For example, let's say you want
to run a function in a separate thread, but wait until it completes. Here's
one way to do it:
use std::thread; fn do_work(done: chan::Sender<()>) { // do something // signal that we're done. done.send(()); } fn main() { let (sdone, rdone) = chan::sync(0); thread::spawn(move || do_work(sdone)); // block until work is done, and then quit the program. rdone.recv(); }
In effect, we've created a new channel that sends unit values. When we're
done doing work, we send a unit value and main
waits for it to be delivered.
Another way of achieving the same thing is to simply close the channel. Once the channel is closed, any previously blocked receive operations become immediately unblocked. What's even cooler is that channels are closed automatically when all senders are dropped. So the new program looks something like this:
use std::thread; fn do_work(_done: chan::Sender<()>) { // do something } fn main() { let (sdone, rdone) = chan::sync(0); thread::spawn(move || do_work(sdone)); // block until work is done, and then quit the program. rdone.recv(); }
We no longer need to explicitly do anything with the done
channel. We give
do_work
ownership of the channel, but as soon as the function stops
executing, done
is dropped, the channel is closed and rdone.recv()
unblocks.
Example: I want more!
There are some examples in this crate's repository: https://github.com/BurntSushi/chan/tree/master/examples
Here is a nice example using the chan-signal
crate to read lines from
stdin while gracefully quitting after receiving a INT
or TERM
signal:
https://github.com/BurntSushi/chan-signal/blob/master/examples/read_names.rs
A non-trivial program for periodically sending email with the output of running a command: https://github.com/BurntSushi/rust-cmail (The source is commented more heavily than normal.)
When are channel operations non-blocking?
Non-blocking in this context means "a send/recv operation can synchronize immediately." (Under the hood, a mutex may still be acquired, which could block.)
The following is a list of all cases where a channel operation is considered non-blocking:
- A send on a synchronous channel whose buffer is not full.
- A receive on a synchronous channel with a non-empty buffer.
- A send on an asynchronous channel.
- A rendezvous send or recv when a corresponding recv or send operation is already blocked, respectively.
- A receive on any closed channel.
Non-blocking semantics are important because they affect the behavior of
chan_select!
. In particular, a chan_select!
with a default
arm will
execute the default
case if and only if all other operations are blocked.
Which channel type should I use?
About 25 years ago I went to dinner with Carl Hewitt and Robin Milner (of CSS and pi calculus fame) and they were arguing about synchronous vs. asynchronous communication primitives. Carl used the post office metaphor while Robin used the telephone. Both quickly admitted that one can implement one in the other.
With three channel types to choose from, it may not always be clear which one you should use. In fact, there has been a long debate over which are better. Here are some rough guidelines:
- Historically, asynchronous channels have been associated with the actor model, which means they're a little out of place in a library inspired by communicating sequential processes. Nevertheless, an unconstrained buffer can be occasionally useful.
- Synchronous channels are useful because their stricter synchronization
semantics can make it easier to reason about the flow of your program. In
particular, with a rendezvous channel, one knows that a
send
unblocks only when a correspondingrecv
consumes the sent value. This makes it feel an awful lot like a function call!
Warning: leaks
Channels can be leaked! In particular, if all receivers have been dropped, then any future sends will block. Usually this is indicative of a bug in your program.
For example, consider a "generator" style pattern where a thread produces values on a channel and another thread consumes in an iterator.
use std::thread; let (s, r) = chan::sync(0); thread::spawn(move || { for val in r { if val >= 2 { break; } } }); s.send(1); s.send(2); // This will deadlock because the loop in the thread // above quits after receiving `2`. s.send(3);
If the iterator loop quits early, the channel's buffer could fill up, which will indefinitely block all future send operations.
(These leaks/deadlocks are detectable in most circumstances, and a send
operation could be made to wake up and either return an error or panic. The
semantics here are still experimental.)
Warning: more leaks
It will always be possible to leak a channel in safe code regardless of the channel's semantics. For example:
use std::mem::forget; let (s, r) = chan::sync::<()>(0); forget(s); // Blocks forever because the channel is never closed. r.recv();
In this case, it is impossible for the channel to close because the internal
reference count will never reach 0
.
Warning: performance
The primary purpose of this crate is to provide a safe, concurrent abstraction. Notably, it is not a zero-cost abstraction. It is not even a near-zero-cost abstraction. Throughput on a channel is startlingly low (see the benchmarks in this crate's repository). Therefore, the channels provided in this crate are most useful as a means to structure concurrent programs at a coarse level.
If your requirements call for performant synchronization of data, chan
is not
the crate you're looking for.
Prior art
The semantics encoded in the channels provided by this crate should mirror or
closely mirror the semantics provided by channels in Go. This includes
select statements! The major difference between concurrent programs written
with chan
and concurrent programs written with Go is that Go programs can
benefit from being fast and loose with creating goroutines. In chan
, each
"goroutine" is just an OS thread.
In terms of writing code:
- Go programs will feature explicit closing of channels. In
chan
, channels are closed only when all senders have been dropped. - Since there is no such thing as a "nil" channel in
chan
, the semantics Go has for nil channels (both sends and receives block indefinitely) do not exist inchan
. chan
does not exposelen
orcap
methods. (For no reason other than to start with a totally minimal API. In particular, callinglen
orcap
on a channel is often The Wrong Thing. But not always. So this restriction will probably be lifted.)- In
chan
, all channels are either senders or receivers. There is no "bidirectional" channel. This is manifest in how channel memory is managed: channels are closed when all senders are dropped.
Of course, Go is not the origin of these ideas, but it has been the strongest influence on the design of this library, and at least one of its authors has done substantial research on the integration of CSP and programming languages.
Macros
chan_select! |
Synchronize on at most one channel send or receive operation. |
Structs
Iter |
An iterator over values received in a channel. |
Receiver |
The receiving half of a channel. |
Sender |
The sending half of a channel. |
WaitGroup |
WaitGroup provides synchronization on the completion of threads. |
Functions
after_ms |
Creates a new rendezvous channel that is dropped after a timeout. |
async |
Create an asynchronous channel with an unbounded buffer. |
sync |
Create a synchronous channel with a possibly empty buffer. |
tick_ms |
Creates a new rendezvous channel that is "ticked" every duration. |