Is there a Rust library which mimics closely the Go [goroutines / channels / select] paradigm?

Is there a Rust library which mimics closely the Go [goroutines / channels / select] paradigm ?

Or in other words, is there a Rust library which would allow one to translate Go code to Rust code nearly literally, i.e. a Rust library with:

  • Channels,
  • Buffered channels,
  • Unbuffered channels (blocking channels),
  • A select mechanism to switch to the active channel amongst a set of channels (the one which is currently receiving data) without having to check all channels of a set one by one linearly.
  • A mechanism to close a channel once all data have been send down said channel.
  • A corresponding mechanism on the receiving end to check if a channel has been closed by a sending routine.

So, the main thing you are requesting are channels. While the standard library features a basic one ::std::sync::mpsc, I think that using dedicated crates will lead to richer APIs as well as enhanced performance. One important thing to observe here is given the precise / explicit nature of Rust, the async vs. non-async difference is important. I'd say that the async ecosystem etc. may get you closer to Go's goroutines; but if you don't have too many "tasks", then good old thread spawning and using non-async channels might work as well.

Non-async channel libraries

  • Optimized for performance

  • Actually, looking at its API, it seems to feature async primitives as well!

async channel libraries

  • As mentioned, flume might actually cover this case as well;
  • a runtime agnostic library.

And now for the potentially runtime "biased" libraries, which also means they're guaranteed to have optimal interaction with said runtimes:

  • The whole sync module of the tokio framework is quite well documented and definitely battle-tested for production.
  • This is the crate used by the async-std framework to handle channels

Select-ing

In the async world, I recommend tokio's select! macro to perform this: the idea is that once you have several async tasks (outermost futures), you can, inside an async context, select on their completion. And receiving (and blocking-sending) from (async) channels yield such futures.

For the non-async / sync world, ::flume features an interesting (non-macro!) Selector API, and crossbeam features both a builder-based select API, and its own select! macro.


Some things to be mindful of in Rust's async world

The following blog post may not be the best entry-level read for async Rust, but it does talk about some pitfalls / footguns in this environment. Special mention to select!-ed futures which may not be polled to completion if they don't win the first "race": for a select!-ed future to be polled to completion, it needs to be enqueued onto an executor beforehand, through the .spawn() method:

Finally, when dealing with Futures and the async world, Pin and Unpin will come back from time to time. In that regard, the following blog post has proven to be quite good at clarifying the situation:

Generally, for a nice blog post with practical examples, both the tokio guide, and the blog of one of its main maintainers, @alice, feature some interesting reads, such as: Actors with Tokio – Alice Ryhl

8 Likes

Thank you very much!

1 Like

My approach to getting Go like threads and channels in Rust was to skip all the heavy duty study of how Rust does async, futures, pin, etc, and just go straight to Tokio:
https://docs.rs/tokio/0.2.0/tokio/
https://tokio.rs/

That has worked out well.

1 Like

That doc link is to an old version of Tokio :slight_smile:

1 Like

I had a go at the Tokio lib.

I tried to put together an exemple using two producers and two consumers :

Sometimes it seems to works, but most of the time the main exits before the ancillary threads.

  • Perhaps I should have used just one channel, but in that case, how do I share the senders / receivers between workers?

  • Perhaps there is a kind of join() missing to make the main wait for the worker threads?

use tokio::sync::mpsc;

#[tokio::main]


async fn main() {
    
    let (tx2, mut rx2) = mpsc::channel(1);

    let (tx1, mut rx1) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(i) = rx2.recv().await {
            println!("1 got = {}", i);
        }
    });
    
    tokio::spawn(async move {
        while let Some(i) = rx1.recv().await {
            println!("2 got = {}", i);
        }
    });

    tokio::spawn(async move {
        for i in 0..10 {
            if let Err(_) = tx2.send(i).await {
                println!("receiver dropped");
                return;
            }
        }
    });

    tokio::spawn(async move {
        for i in 0..50 {
            if let Err(_) = tx1.send(i).await {
                println!("receiver dropped");
                return;
            }
        }
    });
}

To avoid exiting too early, you can ask it to wait for the tasks to quit like this:

let join = tokio::spawn(...);

// wait for spawned task to quit
join.await.unwrap();

The program will exit once main returns.

To share channels, you can clone the sender object. The receiver of an mpsc channel cannot be shared.

1 Like