Creating a Send future by joining an itterator wrapped in an Arc Lock

Hello, I'm having a little problem creating a Send future, when joining an iterator over futures in my async function. It isn't actually a problem by itself, however in my case I have to create them out of an iter over RwLockReadGuard which results in a future cannot be send between threads safely error.

error: future cannot be sent between threads safely
  --> src/main.rs:13:21
   |
13 |         assert_send(&b);
   |                     ^^ future returned by `new` is not `Send`
   |
   = help: within `impl futures::Future<Output = ()>`, the trait `std::marker::Send` is not implemented 
for `std::sync::RwLockReadGuard<'_, Vec<i32>>`
note: future is not `Send` as this value is used across an await
  --> src/main.rs:22:25
   |
19 |     let a = a.read().unwrap();
   |         - has type `std::sync::RwLockReadGuard<'_, Vec<i32>>` which is not `Send`
...
22 |     let c = join_all(b).await;
   |                         ^^^^^ await occurs here, with `a` maybe used later
note: required by a bound in `assert_send`
  --> src/main.rs:4:19
   |
4  | fn assert_send<T: Send>(_: &T) {}
   |                   ^^^^ required by this bound in `assert_send`

Here is a simplified version of my code:

use std::sync::{Arc, RwLock};
use futures::{future::join_all, join};

fn assert_send<T: Send>(_: &T) {}
async fn add_one(n: i32) -> i32 {
    n + 1
}

/// ===
fn main() {
    smol::block_on(async {
        let mut vec = Arc::new(RwLock::new(vec![1, 2, 3, 4]));
        let out = test(vec);
        assert_send(&out);
        out.await;
    });
}

async fn test(a: Arc<RwLock<Vec<i32>>>) {
    let a = a.read().unwrap();

    let b = a.iter().map(async move |n| join!(add_one(*n)).0);
    let c = join_all(b).await;

    println!("{:?}", c);
}

Now in this specific case I can just deref a by cloning it, however in my actual code the vec stores Box<dyn Trait> that don't impl Copy/Clone so there for it isn't an option.

One think that I can do is have a Arc on every element in a vec to allow a to impl Clone but having an atomic reference counter on every element of my vec doesn't feel right so I'm wondering if there is a better solution to this problem.

Note that the data passed to the async function has to be wrapped in Arc and something else for it to be modifiable as it has been earlier passed on to another thread that can modify this variable at an unknown time.

If something is unclear my apologies, this is a fairly specific, and complected problem I'm facing so I'm not sure that I did a good job of explaining it. Please feel free to ask for clarifications if I wrote something uncleanly.

The simple solution is just to wrap the Vec<_> in another Arc to clone it and have async fn test(a: Arc<RwLock<Arc<Vec<i32>>>>). But this is hacky. Since you need to hold a lock on the data through an .await point, you likely want an async lock like tokio::sync::RwLock instead of std::sync’s.

[1]


  1. You don’t need smol in your example—pollster is built to provide a single, simple, block_on function for evaluating a future. If you use smol in your real code, their async lock is called smol::lock::RwLock. ↩︎

1 Like

the lock guard object of RwLock from the standard library cannot be sent to different thread, this is by design. if you want to hold a lock across await points, you should use async aware locks, such as tokio::sync::RwLock or async_lock::RwLock (which is re-exported as smol::lock::RwLock).

instead of mappng the iterator to Futures, you can use a stream. I'll use futures_lite::stream in the following example, but there's equivalent APIs from the futures crate too:

async fn test(a: Arc<RwLock<Vec<i32>>>) {
    use futures_lite::stream::StreamExt;
    let a = a.read().await;
    let b = futures_lite::stream::iter(a.iter());
    let c: Vec<_> = b.then(|x| add_one(*x)).collect().await;
    println!("{:?}", c);
}
2 Likes

Yp I did mention wraping i32 in an Arc. I was however not aware about async RwLock, thanks a lot.
Also thank you @nerditation for providing an example!

I disagree with the “hacky” part. If the goal is solely to read the data (as in the example code @laycookie posted), then holding the lock across the entire reading means that as long as test is iterating, it prevents writing to the data. That might sometimes be desirable semantically, but if it is not, and if there will be many concurrent reads and writes, it's a needless performance bottleneck, and the Arc<Vec<i32>> is a better solution.

Further improvements can be made on that data structure:

  • If the Vec's length will never be changed in the future, then store a Arc<[i32]> instead, to save one indirection and allocation.
  • If the only way this RwLock is used is replacing the Vec entirely, then you can replace the RwLock with arc_swap::ArcSwap, making the final type Arc<ArcSwap<[i32]>>. This has the advantage that writers never block readers.
2 Likes