Why the software paniced whit `future is not `Send` as this value is used across an await`

use futures::future::{BoxFuture, Future};
use tokio::sync::Mutex;
use tokio::time::{self, sleep, Duration};
#[macro_use]
extern crate lazy_static;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use signal_hook::consts::SIGTSTP; // catch ctrl + z signal



async fn a_job() {
    println!("I am working with a_job!");
    sleep(Duration::from_secs(1)).await;
}

async fn b_job() {
    println!("I am working with b_job!");
    sleep(Duration::from_secs(1)).await;
}

struct Job<R> {
    func: Box<dyn Fn() -> BoxFuture<'static, R> + Send + 'static>,
}

impl<R> Job<R> {
    fn new<F>(f: fn() -> F) -> Job<F::Output>
    where
        F: Future<Output = R> + Send + 'static,
    {
        Job {
            func: Box::new(move || Box::pin(f())),
        }
    }
    async fn run(&self) -> R {
        (self.func)().await
    }
}

lazy_static! {
    static ref GLOBAL: Arc<Mutex<Job<()>>> = Arc::new(Mutex::new(Job::new(a_job)));
}

#[tokio::main]
async fn main() {
    let (send, mut recv) = tokio::sync::mpsc::channel(1);
    let term = Arc::new(AtomicBool::new(false));
    signal_hook::flag::register(SIGTSTP, Arc::clone(&term)).unwrap();
    let send_trigger = tokio::spawn(async move {
        while !term.load(Ordering::Relaxed) {
            sleep(Duration::from_secs(1)).await;
        }
        send.send(()).await.unwrap();
    });
    let recv_trigger = tokio::spawn(async move {
        let mut interval = time::interval(time::Duration::from_secs(2));
        loop {
            tokio::select! {
               _ = interval.tick() => {
                let global = GLOBAL.clone();
                let ref f = global.lock().await;
                let _ = f.run().await;
               },
               Some(_) = recv.recv() => {
                    println!("shutting down");
                    return
               },
            }
        }
    });

    let _ = tokio::join!(send_trigger, recv_trigger);
}

dependencies:

[dependencies]
tokio = { version = "1", features = ["time", "full"] }
futures = "0.3.18"
lazy_static = "1.4.0"
signal-hook = "0.3.12"

When I run the code, it paniced with future is not Send as this value is used across an await and ``f is later dropped here. And the total error is like:

Compiling tokio_time v0.1.0 (/Users/banmengtao/rustp/rucron)
error: future cannot be sent between threads safely
   --> src/main.rs:55:24
    |
55  |     let recv_trigger = tokio::spawn(async move {
    |                        ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `Sync` is not implemented for `(dyn Fn() -> Pin<Box<(dyn futures::Future<Output = ()> + std::marker::Send + 'static)>> + std::marker::Send + 'static)`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:62:25
    |
62  |                 let _ = f.run().await;
    |                         ^^^^^^^^^^^^^ first, await occurs here, with `f` maybe used later...
note: `f` is later dropped here
   --> src/main.rs:62:38
    |
62  |                 let _ = f.run().await;
    |                         -            ^
    |                         |
    |                         has type `&Job<()>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:62:25
    |
62  |                 let _ = f.run().await;
    |                         ^^^^^^^
note: required by a bound in `tokio::spawn`
   --> /Users/banmengtao/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `tokio_time` due to previous error

The error you are mentioning is a compile failure, not a panic. Anyway, when you post compile failures, please include the full message as cargo build outputs it. Without the full error message, I am missing important information that I need to be able to help you.

1 Like

Sorry, I have re-edited the post.

Add Sync to the box in Job.

1 Like

impl<R> Job<R> {
    fn new<F>(f: fn() -> F) -> Job<F::Output>
    where
        F: Future<Output = R> + Send + Sync + 'static,
    {
        Job {
            func: Box::new(move || Box::pin(f())),
        }
    }
    async fn run(&self) -> R {
        (self.func)().await
    }
}

Is that right? if so, it doesn't work :joy:

I got it, you are a genius.BTW, why is that?

Because the meaning of a type being Sync is that immutable references to it are Send, and your run method has an immutable reference to self.

1 Like

You're the man!:+1:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.