Say I've got a function that listens for process terminations, where each process termination is represented by a future that resolves when the process terminates. I stick these in a tokio::task::JoinSet
so that I can add more of these futures over time. You might write something like this:
async fn fake_process_termination_fut() {}
async fn wait_for_terminations(mut additions: mpsc::Receiver<String>) {
let mut terminations = JoinSet::new();
let mut abort_handles: BTreeMap<String, AbortHandle> = BTreeMap::new();
let abort_handle = terminations.spawn(fake_process_termination_fut());
// ^^^^^^^^^^^^ first mutable borrow here
abort_handles.insert("first".to_string(), abort_handle);
loop {
let next_termination = terminations.join_next().fuse();
let next_addition = additions.recv().fuse();
tokio::pin!(next_termination, next_addition);
futures::select! {
term = next_termination => {
debug!("a process terminated");
}
name = next_addition => {
let name = name.unwrap();
let handle = terminations.spawn(fake_process_termination_fut());
// ^^^^^^^^^^^^ second mutable borrow here
abort_handles.insert(name, handle);
}
}
}
}
However, if you do this you get an error about multiple mutable borrows:
error[E0499]: cannot borrow `terminations` as mutable more than once at a time
--> src/terminations.rs:268:30
|
258 | let next_termination = terminations.join_next().fuse();
| ------------ first mutable borrow occurs here
...
268 | let handle = terminations.spawn(fake_process_termination_fut());
| ^^^^^^^^^^^^ second mutable borrow occurs here
...
272 | }
| - first borrow might be used here, when `next_termination` is dropped and runs the destructor for type `futures::future::Fuse<impl futures::Future<Output = std::option::Option<std::result::Result<(), tokio::task::JoinError>>>>`
This makes some kind of sense to me. When I create the future for next_termination
it may have a reference to something inside terminations
, and that reference may still be alive due to the first branch of the select!
. I'm not sure what the select!
macro expands to, so this sounds plausible to me.
To fix this, I rewrite the function slightly so that I get access to the unresolved next_termination
future so that I can drop it and release the borrow:
async fn wait_for_terminations2(mut additions: mpsc::Receiver<String>) {
let mut terminations = JoinSet::new();
let mut abort_handles: BTreeMap<String, AbortHandle> = BTreeMap::new();
let abort_handle = terminations.spawn(fake_process_termination_fut());
// ^^^^^^^^^^^^ first mutable borrow here
abort_handles.insert("first".to_string(), abort_handle);
loop {
let next_termination = terminations.join_next().fuse();
let next_addition = additions.recv().fuse();
tokio::pin!(next_termination, next_addition);
match futures::future::select(next_termination, next_addition).await {
Either::Left((termination, _next_addition)) => {
debug!("a process terminated");
}
Either::Right((addition, _next_termination)) => {
// intentionally drop the unresolved future here
drop(_next_termination);
let name = addition.unwrap();
let handle = terminations.spawn(fake_process_termination_fut());
// ^^^^^^^^^^^^ second mutable borrow here
abort_handles.insert(name, handle);
}
}
}
}
This gives me the same error even though I'm explicitly dropping the unresolved future (_next_termination
) before calling spawn
.
This is the part that breaks my brain a little bit. What's going on?