Multiple mutable borrows even though first is dropped?

Say I've got a function that listens for process terminations, where each process termination is represented by a future that resolves when the process terminates. I stick these in a tokio::task::JoinSet so that I can add more of these futures over time. You might write something like this:

async fn fake_process_termination_fut() {}

async fn wait_for_terminations(mut additions: mpsc::Receiver<String>) {
    let mut terminations = JoinSet::new();
    let mut abort_handles: BTreeMap<String, AbortHandle> = BTreeMap::new();
    let abort_handle = terminations.spawn(fake_process_termination_fut());
    //                 ^^^^^^^^^^^^ first mutable borrow here
    abort_handles.insert("first".to_string(), abort_handle);
    loop {
        let next_termination = terminations.join_next().fuse();
        let next_addition = additions.recv().fuse();
        tokio::pin!(next_termination, next_addition);
        futures::select! {
            term = next_termination => {
                debug!("a process terminated");
            }
            name = next_addition => {
                let name = name.unwrap();
                let handle = terminations.spawn(fake_process_termination_fut());
                //           ^^^^^^^^^^^^ second mutable borrow here
                abort_handles.insert(name, handle);
            }
        }
    }
}

However, if you do this you get an error about multiple mutable borrows:

error[E0499]: cannot borrow `terminations` as mutable more than once at a time
   --> src/terminations.rs:268:30
    |
258 |         let next_termination = terminations.join_next().fuse();
    |                                ------------ first mutable borrow occurs here
...
268 |                 let handle = terminations.spawn(fake_process_termination_fut());
    |                              ^^^^^^^^^^^^ second mutable borrow occurs here
...
272 |     }
    |     - first borrow might be used here, when `next_termination` is dropped and runs the destructor for type `futures::future::Fuse<impl futures::Future<Output = std::option::Option<std::result::Result<(), tokio::task::JoinError>>>>`

This makes some kind of sense to me. When I create the future for next_termination it may have a reference to something inside terminations, and that reference may still be alive due to the first branch of the select!. I'm not sure what the select! macro expands to, so this sounds plausible to me.

To fix this, I rewrite the function slightly so that I get access to the unresolved next_termination future so that I can drop it and release the borrow:

async fn wait_for_terminations2(mut additions: mpsc::Receiver<String>) {
    let mut terminations = JoinSet::new();
    let mut abort_handles: BTreeMap<String, AbortHandle> = BTreeMap::new();
    let abort_handle = terminations.spawn(fake_process_termination_fut());
    //                 ^^^^^^^^^^^^ first mutable borrow here
    abort_handles.insert("first".to_string(), abort_handle);
    loop {
        let next_termination = terminations.join_next().fuse();
        let next_addition = additions.recv().fuse();
        tokio::pin!(next_termination, next_addition);
        match futures::future::select(next_termination, next_addition).await {
            Either::Left((termination, _next_addition)) => {
                debug!("a process terminated");
            }
            Either::Right((addition, _next_termination)) => {
                // intentionally drop the unresolved future here
                drop(_next_termination);
                let name = addition.unwrap();
                let handle = terminations.spawn(fake_process_termination_fut());
                //           ^^^^^^^^^^^^ second mutable borrow here
                abort_handles.insert(name, handle);
            }
        }
    }
}

This gives me the same error even though I'm explicitly dropping the unresolved future (_next_termination) before calling spawn.

This is the part that breaks my brain a little bit. What's going on?

When you call tokio::pin!(next_termination), you shadow the owned next_termination variable with a reference to it, Pin<&mut impl Future...>. So, dropping that does nothing — the original Future is still owned by the shadowed variable, and therefore still exists.

But you shouldn't need pin! for this case — it is only needed if you are polling a future created outside a loop from inside a loop.

If you remove next_termination from the tokio::pin call you get an error that next_termination cannot be unpinned.

Also, if it was simply a shadowing issue, I would expect to be able to slap a .boxed() on the next_termination future, but that doesn't work either (same error as before):

let next_termination = terminations.join_next();
let next_addition = additions.recv();
tokio::pin!(next_addition);
match futures::future::select(next_termination.boxed(), next_addition).await {
    Either::Left((termination, _next_addition)) => {
        debug!("a process terminated");
        drop(termination);
    }
    Either::Right((addition, _next_termination)) => {
        drop(_next_termination);
        let name = addition.unwrap();
        let handle = terminations.spawn(fake_process_termination_fut());
        abort_handles.insert(name, handle);
    }
}

I don't have an explanation for you, but this compiles. It seems to think that the Either can drop after being destructured, otherwise. This issue, maybe?

It looks like you can get it to compile if you construct the future as a temporary, but I'm still not quite sure why that works:

async fn wait_for_terminations(mut additions: mpsc::Receiver<String>) {
    let mut terminations = JoinSet::new();
    let mut abort_handles: BTreeMap<String, AbortHandle> = BTreeMap::new();
    let abort_handle = terminations.spawn(fake_process_termination_fut());
    abort_handles.insert("first".to_string(), abort_handle);
    loop {
        let next_addition = additions.recv().fuse();
        tokio::pin!(next_addition);
        futures::select! {
            // Construct the future inline
            term = terminations.join_next().fuse() => {
                debug!("a process terminated");
            }
            name = next_addition => {
                let name = name.unwrap();
                let handle = terminations.spawn(fake_process_termination_fut());
                abort_handles.insert(name, handle);
            }
        }
    }
}

My understanding is that JoinSet::join_next() will create a future with an &mut self reference

pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
    crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
}

meaning that your

let next_termination = terminations.join_next().fuse();

will create a future that mutably borrows terminations.

tokio::pin! then expands to

let mut next_termination = next_termination;
let mut next_termination: Pin<&mut TypeOfNextTermination> = Pin::new_unchecked(&mut next_termination)

meaning your mutable borrow will still be around even if you drop the pin as @kpreid mentioned.

Now, if you tried to use terminations you would get an error, that terminations is already mutably borrowed (through Pin which borrows impl Future which borrows terminations).

futures::future::select(next_termination, next_addition).await

moves the Pin, but since the pin contains a reference only, the drop only drops the &mut terminations reference. Thus accessing terminations gives you the "second mutable borrow" error again.

When you create the future in the futures::select! call with

 futures::select! {
    // Construct the future inline
    term = terminations.join_next().fuse() => {
        debug!("a process terminated");
    }
    [...]
}

the expansion of select! first creates the futures, awaits them and then handles either result without any references to the futures left. In that case the terminations.join_next().fuse() future is dropped and cleaned up before the body is run where you then dont have multiple mutable references.

The boxed example is another counterintuitive one. if i read the error message correctly; because the Pin<Box<TerminationJoinFuture>> is part of the returned Either type, terminations is considered borrowed until the Either object is dropped, even though the Future holding that reference is explicitly dropped earlier. :

... and the first borrow might be used here, when that temporary is dropped and runs the destructor for type `futures::future::Either<[...], ([...], Pin<Box<dyn Future<Output =Option<Result<(), JoinError>>> + Send>>)>`

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.