Async tasks and sender

A Rust newbie here! I am facing a challenge for which I can't see a clear solution and would appreciate any pointer that would at least show me the right direction. Challenge is as follows:

I have to use an async function from a library which takes an std::sync::mpsc::Sender (library does not allow tokio::sync) as input and provides real time measurement values via the channel.

For testing and illustration purposes, I created a sandbox version of the library call as follows (the internals of the real function are a black box)

async fn a_lib_func(tx: Sender<String>, name: String) {
    loop {
        thread::sleep( time::Duration::from_secs(1));
        let now = time::Instant::now(); 
        tx.send(format!("[{}] {:?}",name, now)).unwrap();
    }
}

In the real application, I will need tens of these tasks running concurrently. Thus, there will be a main monitoring async fn along the following lines (using only one task for simplicity)

async fn monitor() {    
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        a_lib_func(tx,"one".to_string()).await;
    });

    while let Ok(msg) = rx.recv() {
        println!("[a_lib_func] {:?}",msg);
    }
}

#[tokio::main]
async fn main() {    
    monitoring().await;
}

When I run this, everything works as expected. And now for the challenge: several of these tasks will need some post-processing before the monitoring loop can digest the input, thus I tried to create a simple "wrapper" function that calls the library function a_lib_func(). The wrapper function if for this example practically identical to the monitoring function:

async fn wrapper() {
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        a_lib_func(tx,"two".to_string()).await;
    });

    while let Ok(msg) = rx.recv() {
        // would post-process the data here and pass it back in a channel
        // left out for simplicities sake
        println!("[wrapper] {:?}",msg);
    }
}

The wrapper function would contain some additional communication channels to pass the process data back. This is replaced by a simple println! for testing purposes.

The adjusted monitoring function using the wrapper of the library function looks then as follows:

async fn monitoring() {    
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        a_lib_func(tx,"one".to_string()).await;
    });

    tokio::task::spawn(async move {
        wrapper().await;
    });

    while let Ok(msg) = rx.recv() {
        println!("[a_lib_func] {:?}",msg);
    }
}

But now I only get the output from the direct a_lib_func task, but not from the wrapper task.

As I need to run these tasks concurrently and the message processing needs to happen reactively and as near real realtime as possible, blocking functions or polling via try_recv (as suggested elsewhere) seem not feasible solution.

Is there a way to make the wrapper function work at all within Rust? Or is there a more idiomatic way to post-process the data from the library function before passing it on to the monitoring function? Or do I need to find another library, that takes tokio::sync::mpsc as input (with the Sender from tokio the wrapper works btw, as expected. Unfortunately I cannot change the library function and am stuck with std::sync).

Any help or pointers would be greatly appreciated.

I’m having a hard time following what exactly your full code looks like at each stage and what exactly the behavior is you’re observing. So for more feedback on the behavior your observing, and why it might happen this way or another, you should probably be more accurate, showing full code examples, if you can, perhaps even some that run reproducibly in the playground.

In any case, you’re pretty much abusing async fns in all these examples. Feel free to read this blog post to learn more about why it’s relevant or approaches to mitigate the problem while staying within an async fn framework/system…

…but in short: the huge problem with your code is that you are calling a lot of blocking functions in async fns. This includes in particular .recv() on a std::sync::mpsc::Receiver, and calls to std::thread::sleep.

In fact, you currently seem to not be using any API at all that can leverage the power of async Rust to save the overhead of needing many system threads. If the code you’re showing here is indicative of your whole use-case, you’ll probably be better off just using std::thread::spawn and dropping the use of tokio and async fns, which has the added benefit of being easier to work with with fewer caveats or things you need to pay attention to.

I’m noticing this only on second read, so you’re saying that it’s an async fn library function, that requires a std::sync::mpsc::Sender argument, is that correct? (Because that appeary, at least potentially, a bit inconsistent.) Is that a particular open-source library (so we can learn more about its intend usage)? Is the library itself properly implemented? (I.e. unlike your “sandbox version” that uses a blocking thread::sleep inside an async fn.)

@steffahn Thanks for the feedback.

The code consist of pretty much what was included, except the library imports. Have put it up on playground (playground has a timeout, which I don't know how to extend. Thus the code only runs for a few seconds).

Regarding the call to sleep: agreed, it was sloppy to use a blocking version, but changing to the non-blocking one does not change anything, unfortunately.

I can't share the library itself as it's closed source, but it behaves pretty much the way a_lib_func is behaving with regards to the use of std::sync::mpsc::Sender, except that the sleep is replaced with some other IO havy, async functions. It basically connects to a source using websockets, pre-processes the returned stream of data and passes it back through the channel.

In this functionally representative, but otherwise simplified, example, I can replace std::sync::mpsc::Receiver with the tokio one, and then the output from the wrapper function also shows up. In the real life application and for this library only, I have to use std::sync::mpsc. Aside from the post-processing, the other reason for the wrapper function is to isolate the use of std::sync::mpsc for the specific cases where this particular library needs to be used.

The reason I wanted to use async (and actually for using Rust on the project) is simply performance. My understanding was that std::thread::spawn can add some overhead (even if just a few ms), which I'm trying to avoid.

Threads are expensive to create and occupy more memory than they strictly need (the thread stack). They are not expensive to run (unless you have many active threads contending for available CPU cores). You should try using plain threads for these purposes and measure the performance you get from that.

The point of async isn't (necessarily) “avoid threads”, it's “make use of few threads for many tasks”. If you don't have many tasks then threads are likely just fine, and may even have less (relevant) overhead because they're not passing through the async executor.

But, this is speculation, as all theoretical recommendations for performance are. If you care about getting the best performance, you must write both versions and measure them. (And make sure you're measuring the right thing — for example, do you care about throughput or latency?)