Storing a long running process with an mps::receiver can't await

Hello All,

I'm trying to make a library that would among other things spawn a long running process fed by a mps::receiver so developers using it could just send stuff into to write to somewhere without having to worry about implementation (essentially like a plugin architecture). The most basic example of what I'm trying to achieve looks like this:

use std::{sync::{mpsc::{channel, Receiver, Sender},Arc, Mutex,}};
use tokio::{spawn, task::JoinHandle};
use rusoto_dynamodb::{DynamoDbClient,ListTablesInput,DynamoDb};
use rusoto_core::Region;

pub struct DynamoWriter {
    controller: Option<JoinHandle<()>>,
    client: Arc<DynamoDbClient>,
}

impl DynamoWriter {
    pub fn new(region: Option<String>) -> DynamoWriter {
        let region = region.unwrap_or_else(|| String::from("eu-west-1"));
        DynamoWriter{
            controller: None,
            client: Arc::new(DynamoDbClient::new(Region::Custom {
                name: region.to_owned(),
                endpoint: format!("kinesis.{}.amazonaws.com", region),
            }))
        }
    }
    pub async fn start_write_stream(mut self) -> Sender<String> {
        let (sender, receiver) = channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let client = Arc::clone(&self.client);

        let cross_thread_receiver = Arc::clone(&receiver);

        self.controller = Some(spawn(async move {
            proc(cross_thread_receiver,client).await
        }));

        sender
    }
}



async fn proc(receiver: Arc<Mutex<Receiver<String>>>, client: Arc<DynamoDbClient>) {
    while let Ok(item) = receiver.lock().unwrap().recv() {
        client.list_tables(ListTablesInput{..Default::default()}).await.unwrap();
    };
}

But I get the error

future cannot be sent between threads safely

future created by async block is not `Send`

help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, std::sync::mpsc::Receiver<std::string::String>>`rustc

lib.rs(1, 1): required by a bound in this

lib.rs(39, 32): future created by async block is not `Send`

lib.rs(52, 26): has type `std::sync::MutexGuard<'_, std::sync::mpsc::Receiver<std::string::String>>` which is not `Send`

lib.rs(53, 9): await occurs here, with `receiver.lock().unwrap()` maybe used later


The part that has me really confused is that it compiles (but is totally useless) if I remove the .await.unwrap(); from the call to dynamo inside the while let loop

I've tried boxing the call to proc and the spawn I put around it to store it in the object but I'm still getting it. I feel there is something fundamental I'm not understanding about futures and would really appreciate it if some one could explain where I've gone wrong.

Thanks

The lifetime of the mutex guard is getting extended; try this:

while let Ok(item) = {
    let res = receiver.lock().unwrap().recv();
    res
} {
    /* ... */
}

You shouldn't use std::sync::mpsc inside async fns. It blocks. Instead you should use tokio::sync::mpsc, which instead allows other async tasks to run on the same thread. Also you shouldn't need the Arc<Mutex<T>> wrapper (for which .lock() blocks too). Instead you can clone the receiver.

1 Like

To understand why you should never use std::sync::mpsc in async code, read this article. Also, it never makes sense to put an mpsc sender inside a mutex. Clone it instead.

2 Likes

Thank you for the responses,

I managed to make it work with geeklint's solution which helped me understand the specific problem I was having.

That being said I read the article you linked Alice and now do understand what you and bjorn3 are saying so will be changing it and did just have one more question to check I understood it all right. Does it make sense to spin up a std::thread::spawn for the long running process (the while loop on the receiver that will constantly be receiving new records to write to the DB) and use tokio::spawn for the smaller I/O tasks that happen on that thread ?

You should only put it in a dedicated thread of you have to block to do it.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.