Shared data in async : please help

Am still getting up the learning curve with async and shared data concepts in rust, and am facing the following problem, which I can't seem to be able to solve:

I have several worker tasks, wiring to a shared structure asynchronously. There is also a monitoring task which concurrently and continuously has observe changes in the shared structure and, given certain constraints are met, update another structure as communicate the results.

I tried all sorts of combinations between running the monitoring task asynchronously or in blocking mode and of using Arc<RwLock>, but I always end up in running either into some move issues, or some send problems.

The real program is more involved (with multiple workers, and more complex processing), but below is a distilled (non-working!) code of the latest version (also on playground)


use std::sync::{Arc, RwLock};
use rand::{Rng, Error};
use tokio::sync::mpsc::{self, Sender};

type ValueResult = Result<f64, Error>;

#[derive(Debug, Clone)]
struct DataVal {
    a_val: f64,
}

#[derive(Debug, Clone)]
struct DataObject {
    max_val: f64,
}

impl DataObject {
    fn new() -> DataObject {
        DataObject {
            max_val: 0.0,
        }
    }
}

async fn monitor(mut dobj: DataObject, shareone:  Arc<RwLock<DataVal>>, tx: Sender<ValueResult>) {
    loop {
        if let Ok(sdata) = shareone.read() {
            if sdata.a_val > dobj.max_val {
                dobj.max_val = sdata.a_val;
                match tx.send(Ok(dobj.max_val)).await {
                    Ok(_) => {},
                    Err(msg) => println!("Error in sending {:?}", msg)
                }
            }
        }
    }
}

async fn data_writer(interval: u64, shared_data: Arc<RwLock<DataVal>>) {
    let mut rng = rand::thread_rng();

    loop {
        if let Ok(mut sdata) = shared_data.write() {
            sdata.a_val = rng.gen::<f64>();
        }
        std::thread::sleep(std::time::Duration::from_secs(interval));
    }
}

async fn run() {
    let (tx, mut rx) = mpsc::channel::<ValueResult>(3);
    let shared: Arc<RwLock<DataVal>> = Arc::new(RwLock::new(DataVal {a_val: 0.0}));

    let mut dobj = DataObject::new();

    let s1 = shared.clone();
    let w1 = tokio::spawn(data_writer(2, s1));

    let s2 = shared.clone();

    let mon = tokio::spawn(monitor(dobj, s2, tx));

    while let Some(res) = rx.recv().await {
        println!("{:?}", res);
    }

}

#[tokio::main]
async fn main() {
    run().await;
}

Any help or hint of how to make this work, would be greatly appriated.

I can fix the current compilation error by doing this, but I don't know if it's correct w.r.t. what you are trying to do

async fn monitor(mut dobj: DataObject, shareone:  Arc<RwLock<DataVal>>, tx: Sender<ValueResult>) {
    loop {
        let v = if let Ok(sdata) = shareone.read() {
            if sdata.a_val > dobj.max_val {
                dobj.max_val = sdata.a_val;
                Some(dobj.max_val)
            } else {
                None
            }
        } else {
            None
        };
        if let Some(v) = v {
            match tx.send(Ok(v)).await {
                Ok(_) => {},
                Err(msg) => println!("Error in sending {:?}", msg)
            }
        }
    }
}

Instead if using Arc<RwLock<_>> you might consider using the Actor pattern: Actors with Tokio – Alice Ryhl and store your data inside the Actor instead.

EDIT:
The reason it didn't work is because you're holding the RwLock across an .await point, which makes the whole Future be !Send. What I did was just refactor so that the lock isn't held across an .await point.

2 Likes
std::thread::sleep(std::time::Duration::from_secs(interval));

Don't do this. See here


let mut rng = rand::thread_rng();

This will cause Send errors. I recommend that you avoid long-lived thread-local random number generators like this:

sdata.a_val = rand::thread_rng().gen::<f64>();

if let Ok(sdata) = shareone.read() {
    // something that uses `.await`
}

This will cause Send errors. You should avoid having calls to .read() or .write() in async fns. Consider something reading this article.

2 Likes

Thanks @bes! This is a livesaver! I was looking in all the wrong places for the solution! Need to get used to these idioms! Will definitely look into the Actors as well. Was working with Scala / Akka quite a bit, and I love the actor approach for cases like this. Did just not feel solid enough with the rust / async basics to try to understand yet another crate.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.