Hello All,
I'm trying to make a library that would among other things spawn a long running process fed by a mps::receiver so developers using it could just send stuff into to write to somewhere without having to worry about implementation (essentially like a plugin architecture). The most basic example of what I'm trying to achieve looks like this:
use std::{sync::{mpsc::{channel, Receiver, Sender},Arc, Mutex,}};
use tokio::{spawn, task::JoinHandle};
use rusoto_dynamodb::{DynamoDbClient,ListTablesInput,DynamoDb};
use rusoto_core::Region;
pub struct DynamoWriter {
controller: Option<JoinHandle<()>>,
client: Arc<DynamoDbClient>,
}
impl DynamoWriter {
pub fn new(region: Option<String>) -> DynamoWriter {
let region = region.unwrap_or_else(|| String::from("eu-west-1"));
DynamoWriter{
controller: None,
client: Arc::new(DynamoDbClient::new(Region::Custom {
name: region.to_owned(),
endpoint: format!("kinesis.{}.amazonaws.com", region),
}))
}
}
pub async fn start_write_stream(mut self) -> Sender<String> {
let (sender, receiver) = channel();
let receiver = Arc::new(Mutex::new(receiver));
let client = Arc::clone(&self.client);
let cross_thread_receiver = Arc::clone(&receiver);
self.controller = Some(spawn(async move {
proc(cross_thread_receiver,client).await
}));
sender
}
}
async fn proc(receiver: Arc<Mutex<Receiver<String>>>, client: Arc<DynamoDbClient>) {
while let Ok(item) = receiver.lock().unwrap().recv() {
client.list_tables(ListTablesInput{..Default::default()}).await.unwrap();
};
}
But I get the error
future cannot be sent between threads safely
future created by async block is not `Send`
help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, std::sync::mpsc::Receiver<std::string::String>>`rustc
lib.rs(1, 1): required by a bound in this
lib.rs(39, 32): future created by async block is not `Send`
lib.rs(52, 26): has type `std::sync::MutexGuard<'_, std::sync::mpsc::Receiver<std::string::String>>` which is not `Send`
lib.rs(53, 9): await occurs here, with `receiver.lock().unwrap()` maybe used later
The part that has me really confused is that it compiles (but is totally useless) if I remove the .await.unwrap(); from the call to dynamo inside the while let loop
I've tried boxing the call to proc and the spawn I put around it to store it in the object but I'm still getting it. I feel there is something fundamental I'm not understanding about futures and would really appreciate it if some one could explain where I've gone wrong.
Thanks