Sharing futures across threads insidie async block callbacks shared across threads

Hello friends!

I want to build a scheduler, which runs callbacks with async blocks inside.
It looks like I need to pin the callback itself and the Future returned from it.

use tokio::time::{sleep, Duration};
use tokio::sync::{Mutex};
use reqwest;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc};

pub type Cb = Pin<Box<dyn Fn(usize) -> Pin<Box<dyn Future<Output=()> + Send + Sync>> + Send + Sync>>;

#[tokio::main]
async fn main() {
    let futures: Arc<Mutex<Vec<Cb>>> = Arc::new(Mutex::new(vec![]));

    futures.lock().await.push(Box::pin(|i| Box::pin(async move {
        sleep(Duration::from_secs(1)).await;
        println!("One second passed! {}", i);
    })));

    futures.lock().await.push(Box::pin(|i| Box::pin(async move {
        sleep(Duration::from_secs(2)).await;
        println!("Two second passed! {}", i);
    })));

    tokio::spawn(async move {
        let lock = futures.lock().await;

        for (i, fut) in (*lock).iter().enumerate() {
            fut(i).await;
        }
    }).await.ok();
}

This runs fine, but when I'm attempting to call a lib function with async code, like the following:

    futures.lock().await.push(Box::pin(|i| Box::pin(async move {
        sleep(Duration::from_secs(2)).await;
        println!("Two second passed! {}", i);

        let resp = reqwest::get("https://httpbin.org/ip").await
            .unwrap()
            .json::<HashMap<String, String>>()
            .await
            .unwrap();
        println!("{:#?}", resp);
    })));

I get a pretty huge error about the Future not having the Send trait:

error[E0277]: `(dyn std::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src\main.rs:162:44
    |
162 |       futures.lock().await.push(Box::pin(|i| Box::pin(async move {
    |  ____________________________________________^
163 | |         sleep(Duration::from_secs(2)).await;
164 | |         println!("Two second passed! {}", i);
165 | |
...   |
171 | |         println!("{:#?}", resp);
172 | |     })));
    | |______^ `(dyn std::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely

It might be that I'm missing some very obvious solution, but I'm a bit stuck with this.

How do I and the Send to this future?

The particular failure you encountered would be fixed by this PR. The easiest fix for you is probably to remove + Sync from your boxed future (but not the boxed Fn).

1 Like

Also, it doesn't make sense to put a Pin around an dyn Fn. You should only have it around the dyn Future.

Thank you, Alice!

Simplifying the annotation did the trick:

pub type Callback =
    Box<dyn Fn(usize) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.