use futures::future::{BoxFuture, Future};
use tokio::sync::Mutex;
use tokio::time::{self, sleep, Duration};
#[macro_use]
extern crate lazy_static;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use signal_hook::consts::SIGTSTP; // catch ctrl + z signal
async fn a_job() {
println!("I am working with a_job!");
sleep(Duration::from_secs(1)).await;
}
async fn b_job() {
println!("I am working with b_job!");
sleep(Duration::from_secs(1)).await;
}
struct Job<R> {
func: Box<dyn Fn() -> BoxFuture<'static, R> + Send + 'static>,
}
impl<R> Job<R> {
fn new<F>(f: fn() -> F) -> Job<F::Output>
where
F: Future<Output = R> + Send + 'static,
{
Job {
func: Box::new(move || Box::pin(f())),
}
}
async fn run(&self) -> R {
(self.func)().await
}
}
lazy_static! {
static ref GLOBAL: Arc<Mutex<Job<()>>> = Arc::new(Mutex::new(Job::new(a_job)));
}
#[tokio::main]
async fn main() {
let (send, mut recv) = tokio::sync::mpsc::channel(1);
let term = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(SIGTSTP, Arc::clone(&term)).unwrap();
let send_trigger = tokio::spawn(async move {
while !term.load(Ordering::Relaxed) {
sleep(Duration::from_secs(1)).await;
}
send.send(()).await.unwrap();
});
let recv_trigger = tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(2));
loop {
tokio::select! {
_ = interval.tick() => {
let global = GLOBAL.clone();
let ref f = global.lock().await;
let _ = f.run().await;
},
Some(_) = recv.recv() => {
println!("shutting down");
return
},
}
}
});
let _ = tokio::join!(send_trigger, recv_trigger);
}
dependencies:
[dependencies]
tokio = { version = "1", features = ["time", "full"] }
futures = "0.3.18"
lazy_static = "1.4.0"
signal-hook = "0.3.12"
When I run the code, it paniced with future is not
Send as this value is used across an await
and ``f is later dropped here
. And the total error is like:
Compiling tokio_time v0.1.0 (/Users/banmengtao/rustp/rucron)
error: future cannot be sent between threads safely
--> src/main.rs:55:24
|
55 | let recv_trigger = tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `Sync` is not implemented for `(dyn Fn() -> Pin<Box<(dyn futures::Future<Output = ()> + std::marker::Send + 'static)>> + std::marker::Send + 'static)`
note: future is not `Send` as this value is used across an await
--> src/main.rs:62:25
|
62 | let _ = f.run().await;
| ^^^^^^^^^^^^^ first, await occurs here, with `f` maybe used later...
note: `f` is later dropped here
--> src/main.rs:62:38
|
62 | let _ = f.run().await;
| - ^
| |
| has type `&Job<()>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/main.rs:62:25
|
62 | let _ = f.run().await;
| ^^^^^^^
note: required by a bound in `tokio::spawn`
--> /Users/banmengtao/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: could not compile `tokio_time` due to previous error