Storing futures in a BTreeMap

Hi all,

I'm having trouble with storing futures and accessing them later.

I can do this fine

let mut fut = TcpTransport::write_to_stream(
                channel_id,
                socket_addr,
                receiver,
            );
self.pool.spawn(async move { fut.await.unwrap() });

But if I store this future in a BTreeMap<i32,Future<Output = Result<(), MeshError>> then naturally it returns references to the future which doest implement core::Future.

I suspect the way around it is using Pin, but I'm not sure exactly how as using (simulating reference returned from btreemap.get(123))

let fut = Pin::new(&TcpTransport::write_to_stream(
                channel_id,
                socket_addr,
                receiver,
           ));
let fut = *Pin::into_inner(fut);
self.pool.spawn(async move { fut.await.unwrap() });

results in really large errors about not implementing Unpin and other traits
Any help appreciated

Does https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=cf8929aa1cdf21f898df0f181a5a7893 help you?

The pain point is that a Future cannot be moved (it must be pinned in place) after it's been polled, so that self-references inside the Future, which are created in async blocks when the async block keeps a reference alive over an await statement.

Pin provides a guarantee that the thing you are referencing via the Pin is pinned in place, even though the Pin is not itself pinned in place. If the thing you are referencing is Unpin, then it does not contain self references, or anything else that would result in it becoming invalid if it is moved.

A BTreeMap can move its values around in order to ensure that the B-Tree invariants are maintained, and thus does not provide a way to directly pin the things it contains in place. Instead, you can use a suitable smart pointer (such as Box or Arc) to contain the Future, and then maintain a map to Pin<Box<_>> or Pin<Arc<_>>, which do ensure that the thing you're pointing to is pinnned in place.

1 Like

Hi @farnz, thanks for that. It almost helps

I've updated it to Playground except in the real life example I'm using ThreadPool rather than LocalPool.

Where you have Box::pin, I replaced with Arc::pin but with no luck. I think nearly there...but not quite

Code looks like

let mut fut = Arc::pin(TcpTransport::write_to_stream(
                channel_id,
                socket_addr,
                receiver,
            ));
            self.conn_futures.insert(channel_id, fut);
            if let Some(fut) = self.conn_futures.get_mut(&channel_id) {
                self.pool.spawn(async move { fut.await.unwrap() });
            }

Error is

error[E0277]: the trait bound `std::sync::Arc<(dyn core::future::future::Future<Output = std::result::Result<(), error::MeshError>> + 'static)>: std::ops::DerefMut` is not satisfied
  --> src\transport.rs:73:46
   |
73 |                 self.pool.spawn(async move { fut.await.unwrap() });
   |                                              ^^^^^^^^^ the trait `std::ops::DerefMut` is not implemented for `std::sync::Arc<(dyn core::future::future::Future<Output = std::result::Result<(), error::MeshError>> + 'static)>`
   |
   = note: required because of the requirements on the impl of `core::future::future::Future` for `std::pin::Pin<std::sync::Arc<(dyn core::future::future::Future<Output = std::result::Result<(), error::MeshError>> + 'static)>>`

error: aborting due to previous error

Ah - you now need to extract an exclusive reference from the Arc, not just a shared reference. It looks like Arc::get_mut can do this, but only if there are no other references to the Arc.

So that's my mistake - Arc is not particularly useful here. You need a pinned pointer (which Arc can provide), but you also need to get exclusive references from it, which Arc can't provide easily.

Hi @farnz do you by any chance have an example? I've update the playground to this which is close but has an unexpected error

   Compiling playground v0.0.1 (/playground)
error[E0277]: the trait bound `std::sync::MutexGuard<'_, impl core::future::future::Future>: core::future::future::Future` is not satisfied
   --> src/main.rs:40:9
    |
40  |         x.await;
    |         ^^^^^^^ the trait `core::future::future::Future` is not implemented for `std::sync::MutexGuard<'_, impl core::future::future::Future>`

MutexGuard is Send so that's one thing, but once I have the lock() I should be able to await on the future?

Thank you so much for helping

I've not put Future inside an Arc before - only inside a Box.

Even if you get this to run via Arc you probably shouldn't. The idea of spawn is that the ownership of a Futures get handed over to an executor, which drives it to completion. Trying to access the Future from any other place will just cause mutability issues.

If you don't need to hand over ownership to an executor, Box::pin and storing Pin<Box<dyn Future>> will be ok as has already been pointed out.

If you spawn tasks you might rather want to reference the output of the Future than the Future itself. E.g. if every task writes to a channel at some point of time, store the other side of the channel in your map. Or store the JoinHandle for the task, which some executors provide.

Hi @Matthias247, I understand that the executor will own the Future. But even we reduce the example to

    let mut pool = LocalPool::new();
    let fut = Arc::pin(Mutex::new(async {1}));
    let spawner = pool.spawner();
    

    spawner.spawn(async move { 
        let guard = fut.lock().unwrap();
        let x :i32 = guard.await;
    });

What is unsound about this?
Box is not Send+Sync so an Arc should be used instead I would have thought, but I cannot seem to figure out why it doesn't work?

This has quite a few problems: First of all you are using a synchronous mutex inside the async fn to lock the Mutex and to obtain the Future. This would block the whole executor. If it's a singlethreaded executor it would deadlock. But likely it won't even be accepted by the compiler, since the mtuex can't be hold across the await point.

Then this only would only pin and execute the future inside the spawn block as part of the await. What would you want to do with that Future outside of the spawn block afterwards? Since the Future has completed at that point of time, you are not allowed to poll it again. The result would just be undefined.

After some more thought, help and experimentation :slight_smile:

#[test]
fn thread_pool() {
    use futures::executor::ThreadPool;
    use async_std::sync::Mutex;
    let pool = ThreadPool::new().unwrap();
    let fut =  Arc::new(Mutex::new(Box::pin(async {1})));


    pool.spawn(async move { 
        let mut guard = fut.lock().await;
        let x :i32 = (&mut *guard).await;
    });
}

This compiles and runs successfully. But yes, it gives rise to the obvious issue @Matthias247 pointed out, is that when the future completes, if it were stored in a map, it would be undefined