I have a thing which represents some state which should be updated in some tokio tasks whilst in "parallel" being written read in other tasks.
I'm failing to get the bits lined up because the struct itself isn't Send which requires unsafe code. This must be a common use case so I am doing something wrong if it requires Send.
My thinking:
two publishers (watch channels)
two tasks which periodically update those channels
simple struct (View) which has some state
a task which periodically renders the state
a select which updates the state in response to something being published
Because they need to synchonise, wrap it in an RwLock. Because tasks could cross threads, wrap it in an Arc.
Help
Following is the code snippet:
use std::{sync::Arc, time::Duration};
use chrono::{DateTime, Local};
use tokio::{
sync::{watch, RwLock},
time::sleep,
};
// the persistent state that is read and written to in "parallel"
#[derive(Debug)]
struct View {
permission: bool,
time: DateTime<Local>,
}
pub(crate) async fn do_it() {
// the first publisher
let (timer_tx, mut timer_rx) = watch::channel(Local::now());
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
timer_tx.send(Local::now()).unwrap();
}
});
// the second publisher
let (permission_tx, mut permission_rx) = watch::channel(true);
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(6)).await;
permission_tx.send(false).unwrap();
}
});
let mut view = View {
permission: false,
time: Local::now(),
};
let handler: Arc<RwLock<View>> = Arc::new(RwLock::new(view));
// printout the view
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
let view = handler.read().await;
println!("View: {:?}", view);
}
});
// update the view
tokio::spawn(async move {
loop {
tokio::select! { // THIS ERRORS BECAUSE View ISN'T SEND
_ = timer_rx.changed() => {
println!("time now is {:?}", *timer_rx.borrow());
handler.write().await.time = *timer_rx.borrow();
}
,
_ = permission_rx.changed() => {
println!("permission is {:?}", *permission_rx.borrow());
handler.write().await.permission = *permission_rx.borrow();
}
}
}
});
I don't really understand which error you ran in to — your View type is Send, so you must have misunderstood something from the error message, but you didn't post it.
Anyway, for shared state that just has data, I recommend something like this:
The most important feature of this design is to keep any use of the lock outside of async code. Instead, you define non-async methods on the struct and call those from async code.
Thanks Alice - I can't believe I forgot the error!:
[Finished running. Exit status: 101]
[Running 'cargo run']
Compiling prideandjoy v0.1.0 (/home/coliny/Dev/com.qfi.health/src/rust-monolith)
error: future cannot be sent between threads safely
--> src/example1.rs:51:5
|
51 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
--> src/example1.rs:56:25
|
56 | handler.clone().write().await.time = *timer_rx.borrow();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ------------------ `timer_rx.borrow()` is later dropped here
| | |
| | has type `tokio::sync::watch::Ref<'_, DateTime<Local>>` which is not `Send`
| await occurs here, with `timer_rx.borrow()` maybe used later
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/example1.rs:56:62
|
56 | handler.clone().write().await.time = *timer_rx.borrow();
| ^^^^^^^^^^^^^^^^^^
note: required by a bound in `tokio::spawn`
--> /home/coliny/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: could not compile `prideandjoy` due to previous erro
error[E0382]: use of moved value: `view`
--> src/example1.rs:65:29
|
50 | let view = View {
| ---- move occurs because `view` has type `View`, which does not implement the `Copy` trait
...
58 | tokio::spawn(async move {
| _____________________________-
59 | | loop {
60 | | sleep(Duration::from_secs(1)).await;
61 | | println!("View: {:?}", view.clone().inner.lock());
| | ---- variable moved due to use in generator
62 | | }
63 | | });
| |_____- value moved here
64 |
65 | tokio::spawn(async move {
| _____________________________^
66 | | loop {
67 | | tokio::select! {
68 | | _ = timer_rx.changed() => {
69 | | println!("time now is {:?}", *timer_rx.borrow());
70 | | view.clone().set_time(*timer_rx.borrow());
| | ---- use occurs due to use in generator
... |
78 | | }
79 | | });
| |_____^ value used here after move