Tokio async and reading and writing same struct in different tasks

Hi,

I have a thing which represents some state which should be updated in some tokio tasks whilst in "parallel" being written read in other tasks.

I'm failing to get the bits lined up because the struct itself isn't Send which requires unsafe code. This must be a common use case so I am doing something wrong if it requires Send.

My thinking:

  • two publishers (watch channels)
  • two tasks which periodically update those channels
  • simple struct (View) which has some state
  • a task which periodically renders the state
  • a select which updates the state in response to something being published

Because they need to synchonise, wrap it in an RwLock. Because tasks could cross threads, wrap it in an Arc.

Help :slight_smile:

Following is the code snippet:

use std::{sync::Arc, time::Duration};

use chrono::{DateTime, Local};
use tokio::{
    sync::{watch, RwLock},
    time::sleep,
};

// the persistent state that is read and written to in "parallel"
#[derive(Debug)]
struct View {
    permission: bool,
    time: DateTime<Local>,
}

pub(crate) async fn do_it() {
    // the first publisher
    let (timer_tx, mut timer_rx) = watch::channel(Local::now());
    tokio::spawn(async move {
        loop {
            sleep(Duration::from_secs(1)).await;
            timer_tx.send(Local::now()).unwrap();
        }
    });

    // the second publisher
    let (permission_tx, mut permission_rx) = watch::channel(true);
    tokio::spawn(async move {
        loop {
            sleep(Duration::from_secs(6)).await;
            permission_tx.send(false).unwrap();
        }
    });

    let mut view = View {
        permission: false,
        time: Local::now(),
    };
    let handler: Arc<RwLock<View>> = Arc::new(RwLock::new(view));

    // printout the view
    tokio::spawn(async move {
        loop {
            sleep(Duration::from_secs(1)).await;
            let view = handler.read().await;
            println!("View: {:?}", view);
        }
    });

    // update the view
    tokio::spawn(async move {
        loop {
            tokio::select! { // THIS ERRORS BECAUSE View ISN'T SEND
                _ = timer_rx.changed() => {
                        println!("time now is {:?}", *timer_rx.borrow());
                        handler.write().await.time = *timer_rx.borrow();
                    }
                ,
                _ = permission_rx.changed() => {
                    println!("permission is {:?}", *permission_rx.borrow());
                    handler.write().await.permission = *permission_rx.borrow();
                }
            }
        }
    });

I don't really understand which error you ran in to — your View type is Send, so you must have misunderstood something from the error message, but you didn't post it.

Anyway, for shared state that just has data, I recommend something like this:

use std::sync::Mutex;

#[derive(Clone)]
struct View {
    inner: Arc<Mutex<ViewInner>>,
}

struct ViewInner {
    permission: bool,
    time: DateTime<Local>,
}

impl View {
    fn set_time(&self, time: DateTime<Local>) {
        self.inner.lock().unwrap().time = time;
    }
    fn get_time(&self) -> DateTime<Local> {
        self.inner.lock().unwrap().time
    }
}

The most important feature of this design is to keep any use of the lock outside of async code. Instead, you define non-async methods on the struct and call those from async code.

1 Like

Thanks Alice - I can't believe I forgot the error!:

[Finished running. Exit status: 101]
[Running 'cargo run']
   Compiling prideandjoy v0.1.0 (/home/coliny/Dev/com.qfi.health/src/rust-monolith)
error: future cannot be sent between threads safely
   --> src/example1.rs:51:5
    |
51  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
   --> src/example1.rs:56:25
    |
56  |                         handler.clone().write().await.time = *timer_rx.borrow();
    |                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^         ------------------ `timer_rx.borrow()` is later dropped here
    |                         |                                     |
    |                         |                                     has type `tokio::sync::watch::Ref<'_, DateTime<Local>>` which is not `Send`
    |                         await occurs here, with `timer_rx.borrow()` maybe used later
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/example1.rs:56:62
    |
56  |                         handler.clone().write().await.time = *timer_rx.borrow();
    |                                                              ^^^^^^^^^^^^^^^^^^
note: required by a bound in `tokio::spawn`
   --> /home/coliny/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `prideandjoy` due to previous erro

Ah, so that's what's wrong. The error is that the Ref type returned by watch::Receiver::borrow must not be held across an .await point.

Anyway, it will go away if you use my design suggestion because there wont be any .await in the line where you set the time.

Alice - you are a star as always! Thanks.

Rust is the continual fight between "can I do this?" and "how do I do this". The suggestion to make it Send challenged the first, hence the questions.

Thanks again.

The suggestion in the error would also work:

let new_time = *timer_rx.borrow();
handler.clone().write().await.time = new_time;

I'm still getting errors about the new View being moved...I'll try wrapping that in an Arc and awaiting it in the task itself

You mean the View from my example? Don't wrap it in an Arc, there's already one inside it. You're supposed to clone the View itself.

If I use it directly then it is moved into the task that prints it out (because of the async move).

Let me put together another code sample...

That's why you clone it. In the same places as where you cloned the Arc before.

I still get:

error[E0382]: use of moved value: `view`
  --> src/example1.rs:65:29
   |
50 |       let view = View {
   |           ---- move occurs because `view` has type `View`, which does not implement the `Copy` trait
...
58 |       tokio::spawn(async move {
   |  _____________________________-
59 | |         loop {
60 | |             sleep(Duration::from_secs(1)).await;
61 | |             println!("View: {:?}", view.clone().inner.lock());
   | |                                    ---- variable moved due to use in generator
62 | |         }
63 | |     });
   | |_____- value moved here
64 |
65 |       tokio::spawn(async move {
   |  _____________________________^
66 | |         loop {
67 | |             tokio::select! {
68 | |                 _ = timer_rx.changed() => {
69 | |                         println!("time now is {:?}", *timer_rx.borrow());
70 | |                         view.clone().set_time(*timer_rx.borrow());
   | |                         ---- use occurs due to use in generator
...  |
78 | |         }
79 | |     });
   | |_____^ value used here after move

You need to clone before the new task is spawned. Once you're inside it, it's too late.

Of course - sorry - thanks!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.