Mutable struct fields with async/await

Hi, I'm trying to have a struct with multiple mutable fields at the same time, for example -

struct ToBeMutable {
    a: Foo,
    b: Bar,
}

where both a & b can be mutable at the same time, but I understand that it can't be done directly as I'll end up having multiple mutable reference of ToBeMutable.
So I tried take help of RefCell and it works fine in general. But the problem comes when I tried RefCell with async/await.

error: future cannot be sent between threads safely
   --> src/main.rs:10:5
    |
10  |     tokio::spawn(execute(imut));
    |     ^^^^^^^^^^^^ future returned by `execute` is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: the trait `std::marker::Sync` is not implemented for `std::cell::Cell<isize>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:16:5
    |
15  |     let a_foo = imut.get_a();
    |         ----- has type `std::cell::RefMut<'_, Foo>` which is not `Send`
16  |     async {}.await;
    |     ^^^^^^^^^^^^^^ await occurs here, with `a_foo` maybe used later
17  | }
    | - `a_foo` is later dropped here

Playground
Is there any workaround?

The module level documentation can explain why RefCell fails you

neither Cell<T> nor RefCell<T> are thread safe (they do not implement Sync ). If you need to do aliasing and mutation between multiple threads it is possible to use Mutex , RwLock or atomic types.

Using async implies multi threading unless you use a single threaded executor.
Also tokio provides Mutex and RwLock to use in an async context

Another solution I can think is spawning a local task using spawn_local that make the future !Send, in other words, it runs in the same thread and don't have the multi threaded requirement.

You probably should not be using Tokio's mutex, but instead the mutex from std (or parking_lot). You can read more about this on Tokio's mutex's documentation, and in the mini-redis example.

Bear in mind that I am a Rust neophyte and have only been trying to get anything using tokio working for a week. But I now have a nice application running that shares things between tokio threads using Arc, Mutex and clone:

First we define the object we want to share and a "new()" method on it for convenience.

struct ToBeMutable {
    a: i32,
    b: i32,
}

impl ToBeMutable {
    fn new () -> Self {
        Self {
            a: 0,
            b: 0,   
        }
    }
}

Then in our tokio code we can create the shared thing and use it from many threads:

    use std::sync::Arc; 
    use tokio::sync::Mutex;

    // Create our shared mutable thing
    let mut to_be_mutable = ToBeMutable::new();

    // Make two reference counted, mutex protected, references to our thing
    let mutable_1 = Arc::new(Mutex::new(to_be_mutable));
    let mutable_2 = mutable_1.clone();

    // Spawn two threads that mutate the thing via the two different references.
    tokio::spawn(async move {
        loop {
            let mut mutable_1 = mutable_1.lock().await;
            (*mutable_1).a -= 1;
            //...
        }
    }); 

    tokio::spawn(async move {
        loop {
            let mut mutable_2 = mutable_2.lock().await;
            (*mutable_2).b -= 1;
            //...
        }
    });

I have no idea if this is the best way to do it. It's just what I came up with during a week of fighting with tokio.

I hope the cost of all that cloning is not major.

I'm curious to know why you say that. Having spent hours figuring out how to do exactly that this week myself.

The Tokip mutex doc you linked to suggests to me that I'm doing the right thing.

If the data behind the mutex is just data (and not, say, an IO primitive), you rarely need to keep it locked across an .await, and if you don't need to keep it across an .await, then you should not be using an async mutex.

I should probably rewrite the section on when to use which mutex. It's not super clear right now.

2 Likes

A pattern I see sometimes is to wrap the Arc<Mutex<...>> in a struct, and provide methods on that struct. This pattern makes it impossible to accidentically hold the lock across an .await, although the fact that MutexGuard is not Send also makes it difficult.

use std::sync::{Arc, Mutex};

pub struct ToBeMutable {
    pub a: i32,
    pub b: i32,
}

#[derive(Clone)]
pub struct ToBeMutableHandle {
    inner: Arc<Mutex<ToBeMutable>>,
}

impl ToBeMutableHandle {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(Mutex::new(ToBeMutable {
                a: 0,
                b: 0,
            }))
        }
    }
}

Using this pattern, you can define a method like this:

impl ToBeMutableHandle {
    pub fn with_lock<F, T>(&self, func: F) -> T
    where
        F: FnOnce(&mut ToBeMutable) -> T,
    {
        let mut lock = self.inner.lock().unwrap();
        let result = func(&mut *lock);
        drop(lock);
        result
    }
}

and common methods can be implemented like this:

impl ToBeMutableHandle {
    pub fn get_and_decrement_b(&self) -> i32 {
        self.with_lock(|shared| {
            shared.b -= 1;
            shared.b
        })
    }
}

Which can be used like this:

let task_1 = tokio::spawn(async move {
    for _ in 0..5 {
        let twice_a = shared_1.with_lock(|shared| {
            let twice_a = 2 * shared.a;
            shared.a -= 1;
            twice_a
        });
        println!("Twice a is {}", twice_a);

        delay_for(Duration::from_millis(10)).await;
    }
});

let task_2 = tokio::spawn(async move {
    for _ in 0..5 {
        let b = shared_2.get_and_decrement_b();
        println!("shared.b is {}", b);

        delay_for(Duration::from_millis(10)).await;
    }
});

click to see full example

The reason this makes it impossible to keep the lock across an .await is that the closure passed to with_lock does not allow the use of .await inside it.

Data that you need at .awaits can be returned from the with_lock closure, and be used normally.

3 Likes

Edit: I wrote this before seeing you post above. If that makes any difference.

Now I'm more confused.

Our OP asked exactly about sharing mutable data across async/await. I took that to mean sharing between async threads. Surely that demands guarding with a mutex? Or am I missing a point.

Actually there is another confusion in my mind. Having spent the week trying to get something working that spawns a bunch of threads for handling serial ports, socket server connections, and NATS messaging connections, using tokio I sit back and look at it all and wonder what I have created?

To my naive mind such an async system, running on one core and using one OS thread, would be running all those tokio threads in the same OS thread context. As such whatever mechanism is use do guard shared data across tokio threads would not need pthread style mutexs with their use of actual atomic machine operations like lock exchange down in their bowels.

But, if I understand correctly, tokio threads can indeed end up being run on on multiple OS threads from a tokio thread pool. In which case those async threads do indeed need proper mutex guards.

I notice that when my tokio program runs it uses 6 OS threads.

And lastly, I don't understand why you suggest using the mutex from std rather than Tokio's mutex.

I'm confused. Seems I have something working very well here, but I have no idea how or why! :frowning:

I think the confusion here is that there is a difference between sharing mutable data with code that runs inside an async function, and actually keeping the mutex lock active across an .await. For example, compare this

// This requires an async mutex
// (or rewrite to not keep it across an .await)
struct Shared {
    i: i32,
}

async fn foo(data: Arc<Mutex<Shared>>) {
    let mut lock = data.lock().unwrap();
    
    lock.i += 1;
    do_async_stuff().await;
    
    // this is after an .await
    println!("{}", lock.i);
}

and this

// This should prefer an std mutex
struct Shared {
    i: i32,
}

async fn foo(data: Arc<Mutex<Shared>>) {
    {
        let mut lock = data.lock().unwrap();
    
        lock.i += 1;
        println!("{}", lock.i);
        
        // lock goes out of scope here before .await
    }
    do_async_stuff().await;
}

When the lock itself is not kept across an .await, it is much more performant to use an std mutex. The async mutex is explicitly optimized for uses with long critical sections, and is written to be very fair, which has an impact on performance, but is good for tail latencies when you have long critical sections.

The reason that .awaits are so important boils down to how Tokio can run many things on a few threads. It does this by swapping the current task whenever it yields control, and tasks can only yield control at .await points. This means that if the lock is not kept across an .await, then the task will not yield control until after releasing the lock again.

2 Likes

OK. Can I relate async/tokio with things I'm more familiar with from history:

Prehistoric Times: We programmed or single processor 8/16 bit computers is assembler. The only things one could imagine calling threads were the main program loop and the interrupt handlers. Mutual exclusion was performed by disabling/enabling interupts. Globally with instruction like DI/EI or individually by writing to the interrupt controller. Nice and simple, a handful of bytes of code.

Medieval Times: We programmed our 16/32 bit machines in higher level languages like Coral, PL/M, Pasca, Ada, C. We had cooperative multi-tasking schedulers. I wrote one for a military comms system. Those threads had to cooperate, being sure to suspend themselves frequently to avoid hanging the whole system. Effectively only one real thread of execution hopping between many cooperative threads each with it's own stack. Could we call them "green threads" now a days?. Of course there were still those interrupts to deal with and for multi-processor interaction we had hardware support like atomic lock exchange instructions. Not so simple, a lot more code but could still run in kilobytes.

Modern Times: We have real threads, supported by many real-time embedded operating systems and things like Linux and Windows. We never see an interrupt. We don't have to be cooperative anymore. Mutual exclusion shows up as things called "mutex" and the like. Massively more complex, millions of lines of code. So much code between hardware and application it's amazing it performs as well as it does. (But still I can parallelize C code over 8 cores on a single chip MCU with only 32K RAM using OpenMP!)

This Century: Async shows up in many guises in language from Javascript to C++. And of course Rust. I have no idea what is going on anymore :frowning:

Am I right is assuming that Toki threads are rather like the threads of the cooperative scheduling systems I mentioned above? Effectively a single real thread juggled by the tokio run-time? We have to take care not to hog our thread else everything hangs up? Except unlike those old cooperative threads our async code is compiled into some huge state machine that hops around attending to all our threads, all in a single stack space? And except there may be more than one real thread in play, potentially on many cores?

I start to see why Tokio would need it's own mutex between the treads it is juggling. And why, in some cases a standard mutex will do just fine.

Any way, I swapped out a Tokio mutex for a standard one in one case in my first attempt at a Tokio program and it works just fine! I have no idea how that impacts my performance though.

1 Like

Yes, exactly. Tokio will spawn multiple threads by default (one per cpu core) to make it more performant, but it can totally run on a single thread using the basic scheduler. And it is indeed cooperative in the sense that if you spend a long time without reaching an .await, you will starve other tasks on the scheduler, as Tokio can only switch between tasks at .awaits.

Yes, exactly. Async/await has the compiler produce a giant enum that expresses a state machine for that operation, with a state at each .await.

This means that async functions don't have a stack in the same way as your medieval times did. Instead, the future object knows how large it is, and when the future is spawned, Tokio allocates exactly the maximum amount memory that this future will ever need to store. Note that this is why you cannot have recursive async functions without introducing boxing to dynamically grow the "stack space".

By not keeping the lock across an .await, you make the critical sections of the lock much much smaller, as the task is never suspended while holding a lock.

Note that if it was possible to keep an std mutex lock across an .await, you could run into deadlocks, since if the task holding the lock is currently suspended, and some other task tries to acquire the lock, then since the locking call does not use .await, the task holding the lock will never be able to release it. The main feature provided by the async mutex is the ability to yield control while waiting for the lock, which means that you cannot run into deadlocks in this way.

Additionally, due to the Tokio mutex's primary use case involving much longer lock durations, the mutex is fair in the sense that locks are guaranteed to be given in a first-come first-serve order, however this fairness is somewhat costly. It doesn't matter when you hold the lock for a long time anyway, but when used only between .await with very short critical sections, the cost becomes rather large.

2 Likes

That’s my understanding. async and all the other “green thread” systems are about using a single OS thread to simulate several threads for performance reasons, via cooperative yielding. Rust uses explicit yield points and some other languages hide it inside the IO calls, but they’re otherwise the same thing.

If you use a single-threaded executor, you also get things that are easier to reason about because everything that happens between await points can be considered atomic, but that comes at the expense of a long-running computation freezing progress on all threads.

Since you’re not afraid of some low-level, potentially unsafe programming, you might want to try writing your own async scheduler as an exercise— it’s not that hard to do if you don’t care about performance and the best way I know to understand what’s actually going on.

Thanks alice and 2e71828. I feel a bit more confident about it all now.

Do you mean I have to do something special to get it to use all my cores? Do I have to build a run time for each core myself or what?

Oddly enough, back in the mid '80s I worked with some guys who wanted to port thousands of lines of some protocol stack to the IBM PC and MS-DOS. It was written for some threaded OS, DOS has no threads. I ended up implementing all the OS API they needed as state machines, one per thread if I recall correctly, and my run time hopped around them. Worked a treat.

What was old is new again. But different.

Out of curiosity is there a way to see the source code of the mess of state machine the compiler generates for asyc code?

That is an interesting idea. Where would I read about what such a thing needs to do?

What I'd like to know is what happens if I want to move my async code to some embedded micro-controller? I'm guessing Tokio is not going to fly there and creating ones own runtime may be required.

The only real requirement is that it repeatedly calls poll on one or more Futures until they return Ready. To do that, you’ll need to make a Context that lets the future notify you when it’s ready to do more work. That’s ultimately done by writing the functions for a RawWakerVTable.

For an extremely minimal example, you can look at my attempt for a single-future one: Is this a valid async executor implementation?

No, I mean you have to do something special to get it to not use all of your cores. Check out the tokio::runtime module for more info.

No, but there are a few threads on this forum where I have manually written the state machine out for an async function. One of them is here.

You're going to need a specialized executor for your micro-controller instead of Tokio, but the async/await feature should totally work there.

1 Like

Depending on your requirements, it might just be enough to do the following:

let tbm = ToBeMutable { a: Foo, b: Bar };
// […]
let a = &mut tbm.a;
let b = &mut tbm.b;

You can then use the different pieces for different things.

You can also write:

let ToBeMutable { a, b } = &mut tbm;

If you don't need all fields or want the variables to have different names, you can write:

let ToBeMutable { a: my_var, b: _ } = &mut tbm;

(OP: sorry to contribute to derailing this thread, but I hope the question is answered by the posts above)

@ZiCog, here are some high quality resources:
Stjepan's two articles about building an executor Part 1, part 2.

For embedded I think this could be an interesting read (from the part I linked to and to the end of that chapter).

I also went through this exploration about async, stackless coroutines, generators and what they compile to and detailed what I discovered here - albeit the quality is probably not as high as the two previous resources.

What was old is new again. But different.

I found your trip through history pretty amusing. It's interesting to see how things evolve.

1 Like

Thanks for all the great, but off topic, info. I might be opening a new thread re: my first tokio program as I have more questions arising from it...

2 Likes

Thank you guys for taking your time to reply, really appreciate it.
Locking across await or not, I was not really looking into any specific solution. I am fairly new to rust, so I'm open to any/every idea(s).

I don't mind that discussion took little turn as it's still somewhat relevant and also helped me to clarify some ideas regarding async runtime.

I never worked on embedded systems, but I think you might find this one interesting. (EDIT:) Also this one.

1 Like

Thanks, that is interesting indeed.