Work with mutable in async context

Hello, again for some persons :wink:

I'm working on a little daemon which must deal with a web server game. This daemon must play non players characters in the following way: Non players characters will emit action at defined intervals. Daemon will create websocket (one par area in game world) and transmit websocket messages to concerned non players characters.

To deal with mutable constraints, I image to do this: Non players characters will emit actions without modify themselves (immutable). Part which receive with websocket messages will modify non players characters (mutable.)

There is a sample code:

Cargo.toml

[dependencies]
futures = "0.3.8"

[dependencies.async-std]
version = "1.8.0"
features = ["unstable"]

main.rs

use async_std::task;
use async_std::prelude::*;
use std::time::Duration;
use async_std::stream;

struct NonPlayableCharacter {
    counter: i64,
}
impl NonPlayableCharacter {
    pub async fn do_some_actions(&self) {
        let mut interval = stream::interval(Duration::from_secs(1));
        while let Some(_) = interval.next().await {
            println!("emit {}", self.counter)
        }

    }
    pub async fn react_to_event(&mut self) {
        println!("work as mutable")
    }
}


async fn daemon() {
    let mut non_playable_characters = vec![
        NonPlayableCharacter { counter: 0 },
        NonPlayableCharacter { counter: 1000 },
    ];
    let mut futures = futures::stream::FuturesUnordered::new();

    // Add futures 
    futures.extend(non_playable_characters.iter().map(|npc| npc.do_some_actions()));

    // Simulate two different websocket which receive messages (one websocket per area in world)
    for i in 0..2 {
        let mut npc = &mut non_playable_characters[i];  // for this example, a npc per websocket
        npc.react_to_event();  // modify struct in react to websocket messages
    }

    futures.for_each(drop).await;
}

fn main() {
    task::block_on(daemon())
}

But I can't use both mutable and immutable at once:

error[E0502]: cannot borrow `non_playable_characters` as mutable because it is also borrowed as immutable
  --> src/main.rs:35:28
   |
31 |     futures.extend(non_playable_characters.iter().map(|npc| npc.do_some_actions()));
   |                    ----------------------- immutable borrow occurs here
...
35 |         let mut npc = &mut non_playable_characters[i];
   |                            ^^^^^^^^^^^^^^^^^^^^^^^ mutable borrow occurs here
...
39 |     futures.for_each(drop).await;
   |     ------- immutable borrow later used here

How can I achieve my goal with immutable/mutable/borrow mechanism ? Note non player character state must be shared between "interval actions" and "websocket message react".

Well the definition of a mutable reference is that it provides exclusive access, but you don't have exclusive access, so that's not going to work. I can think of two possible options:

  1. Use interior mutability (Mutex/RefCell/Cell etc.) to enable modifying the struct through an immutable reference.
  2. Do not try to mutate the struct while there exists other references.

The second approach can be done by e.g. spawning a task that sends messages to some code that then accesses the struct. This way, if only that code modifies the struct, then it can have exclusive access when the tasks are out sleeping on a timer.

1 Like

Thanks, i will study these solutions !

Hello again @alice ,

Since your response I work on new solution. But I partially failed with Mutex (I have success to have mutable NonPlayableCharacter , see here but fail to have mutability on vector containing these NonPlayableCharacter).

So I explore the second way ("spawning a task that sends messages to some code that then accesses the struct"). But I don't understand how I can use NonPlayableCharacter vector as mutable. Example:

use async_std::task;
use async_std::prelude::*;
use std::time::Duration;
use async_std::stream;
use async_std::sync::Mutex;
use async_std::stream::Interval;
use async_std::pin::Pin;
use futures::future::join_all;

struct NonPlayableCharacter {
    counter: i64,
}
impl NonPlayableCharacter {
    pub async fn do_some_actions(&self) {
        let mut interval = stream::interval(Duration::from_secs(1));
        while let Some(_) = interval.next().await {
            println!("emit {}", self.counter)
        }
    }

    pub async fn react_to_event(&self) {
        println!("react to event")
    }
}

async fn receive_websocket_messages(non_playable_characters: &Vec<NonPlayableCharacter>) {
    let mut interval = stream::interval(Duration::from_secs(2));
    while let Some(_) = interval.next().await {
        for npc in non_playable_characters.iter() {
            npc.react_to_event().await
        }
    }
}


async fn daemon() {
    let non_playable_characters: Vec<NonPlayableCharacter> = vec![
        NonPlayableCharacter { counter: 0 },
        NonPlayableCharacter { counter: 1000 },
    ];
    let mut futures: Vec<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>> = vec!();

    // Add NPC futures
    for npc in non_playable_characters.iter() {
        futures.push(Box::pin(npc.do_some_actions()))
    }

    // Websocket future
    futures.push(Box::pin(receive_websocket_messages(&non_playable_characters)));

    // QUESTION HERE
    // Imagine previous futures use a channel to send messages collected in other future which
    // can modify NonPlayableCharacter and vector of NonPlayableCharacter
    // If i declare this future here, non_playable_characters must be mutable no ? This is
    // incompatible with previous immutable.
    // Where mutable modifications can be done ?
    
    // Start daemon
    join_all(futures).await;
}

fn main() {
    task::block_on(daemon())
}

The channel idea would avoid the immutable borrow like this:

enum Message {
    PrintCounter {
        npc_index: usize,
    }
}

async fn do_some_action(i: usize, msg: Sender<Message>) {
    let mut interval = stream::interval(Duration::from_secs(1));
    while let Some(_) = interval.next().await {
        msg.send(Message::PrintCounter {
            npc_index: i,
        });
    }
}
while let Some(msg) = recv.recv().await {
    match msg {
        Message::PrintCounter { npc_index } => {
            println!("emit {}", non_playable_characters[npc_index].counter);
        }
    }
}

Only the while loop has any sort of access to the array under this approach.

@alice Hi, as I see in your example, there is no problem with ownership yes. But, I need to be able to modify my non_playable_characters and NonPlayableCharacter.

I'm currently reading documentation on RefCell (but this is hard for me, I do not have high school degrees ...)

There are some great tutorials on what is RefCell and how it's used on youtube, maybe they'll make things a bit clearer:

1 Like

Hello @alice and @qaopm ! I have success with RefCell and Mutex ! But, I have encountered an error when I set up async channel.

Code without usage of channel is working. But when I use my channels, this error happen (Following code is same as working code with channel_sender.send(message).await; uncommented) :

use async_std::pin::Pin;
use async_std::sync::Mutex;
use async_std::task;
use futures::future::join_all;
use std::cell::RefCell;
use std::time::Duration;
use async_std::channel::{unbounded, Sender, Receiver};

use crate::zone::Zone;
use crate::ac::AnimatedCorpse;
use crate::message::Message;

mod ac;
mod zone;
mod message;

async fn on_events(zones: &Mutex<RefCell<Vec<Zone>>>, channel_sender: &Sender<Message>) {
    loop {
        // Simulate websocket events
        task::sleep(Duration::from_secs(2)).await;

        {
            for zone in zones.lock().await.borrow_mut().iter_mut() {
                for message in zone.react() {
                    channel_sender.send(message).await;
                }
            }
        };
    };
}

async fn animate(zones: &Mutex<RefCell<Vec<Zone>>>, channel_sender: &Sender<Message>) {
    loop {
        task::sleep(Duration::from_secs(1)).await; // TODO calculate to have 1 fps
        {
            for zone in zones.lock().await.borrow_mut().iter_mut() {
                for message in zone.animate() {
                    channel_sender.send(message).await;
                }
            }
        };
    };
}

async fn on_messages(channel_receiver: Receiver<Message>) {
    while let Ok(message) = channel_receiver.recv().await {
        match message {
            Message::HelloWorldZone => {println!("HelloWorldZone")}
            Message::HelloWorldAnimatedCorpse => {println!("HelloWorldAnimatedCorpse")}
        }
    };

    // TODO: manage daemon close
    panic!("Channel is closed !")
}

async fn daemon(mut animated_corpses: Vec<Box<dyn AnimatedCorpse + Send + Sync>>) {
    let (channel_sender, channel_receiver) = unbounded();
    let mut zones: Vec<Zone> = vec![];

    // fake here by adding all animated_corpse in same zone
    for i in 0..animated_corpses.len() {
        let animated_corpse = animated_corpses.pop().unwrap();
        let zone = Zone::new(0, i as u32, vec![animated_corpse]);
        zones.push(zone);
    }

    let zones: Mutex<RefCell<Vec<Zone>>> = Mutex::new(RefCell::new(zones));
    let mut futures: Vec<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>> = vec![];

    futures.push(Box::pin(on_events(&zones, &channel_sender)));
    futures.push(Box::pin(animate(&zones, &channel_sender)));
    futures.push(Box::pin(on_messages(channel_receiver)));

    join_all(futures).await;
}

fn main() {
    let mut animated_corpses: Vec<Box<dyn AnimatedCorpse + Send + Sync>> = vec![];
    for i in 0..2 {
        animated_corpses.push(Box::new(ac::rabbit::Rabbit::new(0, i)));
    }

    task::block_on(daemon(animated_corpses))
}
Error is:

error: future cannot be sent between threads safely
  --> src/main.rs:71:18
   |
71 |     futures.push(Box::pin(on_events(&zones, &channel_sender)));
   |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `on_events` is not `Send`
   |
   = help: the trait `Sync` is not implemented for `RefCell<Vec<Zone>>`
note: future is not `Send` as this value is used across an await
  --> src/main.rs:25:21
   |
23 |             for zone in zones.lock().await.borrow_mut().iter_mut() {
   |                         ------------------                       - `zones.lock().await` is later dropped here
   |                         |
   |                         has type `&RefCell<Vec<Zone>>` which is not `Send`
24 |                 for message in zone.react() {
25 |                     channel_sender.send(message).await;
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `zones.lock().await` maybe used later
help: consider moving this into a `let` binding to create a shorter lived borrow
  --> src/main.rs:23:25
   |
23 |             for zone in zones.lock().await.borrow_mut().iter_mut() {
   |                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   = note: required for the cast to the object type `dyn futures::Future<Output = ()> + std::marker::Send`

I tried to add + Send + Sync like in Vec<Box<dyn AnimatedCorpse + Send + Sync>> but without success.

What it means ? Can I solve that ? Thanks !

I found a solution ! I was misunderstanding error. By moving channel await outside for loop no problem, exemple:

async fn on_events(zones: &Mutex<RefCell<Vec<Zone>>>, channel_sender: &Sender<Message>) {
    loop {
        // Simulate websocket events
        task::sleep(Duration::from_secs(2)).await;
        let mut messages: Vec<Message> = vec!();

        {
            for zone in zones.lock().await.borrow_mut().iter_mut() {
                messages.extend(zone.react());
            }
        };

        for message in messages {
            channel_sender.send(message).await;
        }
    };
}

I don't have time to look closely, but a Mutex and RefCell serve the same purpose, so pick one of them and don't use both. My guess is you want Mutex as RefCell is not Sync.

Check out this article:

https://tokio.rs/tokio/tutorial/shared-state

1 Like

Thanks for your time, i will take a look !