How can I tell the compiler the immutable borrows don't exist anymore after polling a FuturesUnordered?

I have a code similar to the following; essentially, I run a bunch of computation in parallel based on values from the initial context, aggregates the results, then once I got everything back I update the context with the new values.

However the compiler is refusing to let me save the new values into the context, pretexting I already have mutable borrows of the context. That's true, I have references to the context in my futures to avoid having to clone it, but since the futures are guaranteed to have completed by the time I write the result back, my understanding is that the mutable borrow is safe.

Is there a clean way to let the compiler know? I tried drop(self) and drop(self.running) to no avail.

Playground

use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};

struct Foo {
    pub base_value: u32,
    pub values: std::collections::HashMap<u32, u32>,
}

struct Bar<'a> {
    pub running: FuturesUnordered<BoxFuture<'a, (u32, u32)>>,
}

impl<'a> Bar<'a> {
    pub fn schedule(&mut self, foo: &'a Foo, n: u32) {
        self.running.push(Box::pin(async move {
            (n, foo.base_value + n)
        }));
    }

    pub async fn run(mut self, foo: &'a mut Foo) {
        let mut values = foo.values.clone();
        
        self.schedule(foo, 1);
        self.schedule(foo, 10);

        while let Some((n, updated_n)) = self.running.next().await {
            values.insert(n, updated_n);
        }

        // Why isn't 'drop' here telling the compiler that the borrows
        // are done? Also tried 'drop(self.running)'
        drop(self);

        foo.values = values;
    }
}

fn main() {}

I think, the most clearly limiting factor that this code hits is that, unfortunately, FuturesUnordered is not covariant.[1] Otherwise, something like let this = self;, allowing variance coercion, could probably save this code.

In other words, the compiler “cannot know” that FuturesUnordered<BoxFuture<'a, (u32, u32)>> acts towards the lifetime 'a like a container of 'a-lifetime references. It’s this very “acts like a container” property that makes the reasoning of “foo should be safe to use after self is dropped” valid in the first place. Here’s why:

Imagine FuturesUnordered<BoxFuture<'a, (u32, u32)>> and thus Bar<'a> would contain a fn(&'a Foo) callback, a typical example of something not covariant in 'a. Then imagine, for instance, we call run in the case 'a == 'static. From the compiler’s POV, this is one possibility that needs to be handled soundly. From the compiler’s POV, we must be calling schedule with a &'static Foo reference then, and within it, the (hypothetical) fn(&'static Foo)-callback might leak this reference to some global place somewhere else. We mustn’t mutate Foo after this, as the shared reference keeps on existing.


  1. And I suspect that it probably soundly could be covariant… though I haven’t verified this suspicion yet ↩︎

3 Likes

Now, how best to avoid the issue; there are multiple possibilities, depending on your what else your API shall support.

The question you’ll need to ask is “why does the running: FuturesUnordered<BoxFuture<'a, (u32, u32)>> even exist before we called run?”

Is it already polled? Is there some stuff already added to it? Or is it always completely empty, anyways? In many cases, you might be able to make some sort of builder-pattern instead.

Create a BarBuilder or BarBuilder<'a> (depending on whether you already want to collect some BoxFuture<'a, (u32, u32)> beforehand, or not). A BarBuilder<'a> with a lifetime could store the futures in a covariant collection, such as a Vec, but feel free to remove that it it would always be empty anyways. Here’s a proof of concept:

use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};

struct Foo {
    pub base_value: u32,
    pub values: std::collections::HashMap<u32, u32>,
}

struct BarBuilder<'a> {
    pub running: Vec<BoxFuture<'a, (u32, u32)>>,
}

struct Bar<'a> {
    pub running: FuturesUnordered<BoxFuture<'a, (u32, u32)>>,
}

impl<'a> Bar<'a> {
    pub fn schedule(&mut self, foo: &'a Foo, n: u32) {
        self.running.push(Box::pin(async move {
            (n, foo.base_value + n)
        }));
    }
}

impl<'a> BarBuilder<'a> {
    pub async fn run(self, foo: &'a mut Foo) {
        let mut this = Bar {
            running: FuturesUnordered::from_iter(self.running),
        };
        let mut values = foo.values.clone(); // hint: you could consider using `std::mem::take(&mut foo.values)` here instead
        
        this.schedule(foo, 1);
        this.schedule(foo, 10);

        while let Some((n, updated_n)) = this.running.next().await {
            values.insert(n, updated_n);
        }

        // 'drop' here is effectively telling the compiler that the borrows are done.
        drop(this);

        foo.values = values;
    }
}

fn main() {}

Rust Playground

Of course, feel free to come up with a better name than BarBuilder.

1 Like

To be fair, the previous code isn’t really needing Vec’s covariance either.[1] You could also collect a FuturesUnordered into a new FuturesUnordered. This discards some information about which of the in-flight futures currently need polling and which don’t, but that only results in at most one superflous extra poll per future and shouldn’t be much of an issue.[2]

So even this kind of solution is possible:

use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};

struct Foo {
    pub base_value: u32,
    pub values: std::collections::HashMap<u32, u32>,
}

struct Bar<'a> {
    pub running: FuturesUnordered<BoxFuture<'a, (u32, u32)>>,
}

impl<'a> Bar<'a> {
    pub fn schedule(&mut self, foo: &'a Foo, n: u32) {
        self.running.push(Box::pin(async move {
            (n, foo.base_value + n)
        }));
    }

    pub async fn run(self, foo: &'a mut Foo) {
        let mut this = Bar { // uses `.map` to do the variance coercion
            running: self.running.into_iter().map(|fut| fut).collect()
            , ..self
        };
        let mut values = foo.values.clone(); // hint: you could consider using `std::mem::take(&mut foo.values)` here instead
        
        this.schedule(foo, 1);
        this.schedule(foo, 10);

        while let Some((n, updated_n)) = this.running.next().await {
            values.insert(n, updated_n);
        }

        // 'drop' here is effectively telling the compiler that the borrows are done.
        drop(this);

        foo.values = values;
    }
}

fn main() {}

Rust Playground


  1. It’s using it, but the same approach as below would work, too, if Vec wasn’t covariant ↩︎

  2. Of course, it feels just to unnecessary an overhead, the whole iterate+collect scheme, where a simple no-op conversion should ideally be appropriate [I believe] ↩︎

Maybe allowing variance coercion could not saving the code?

schedule(&mut self, foo: &'a Foo, n: u32) is covariant at &'a Foo, So, we can pass a &'c Foo where 'c: 'a, the result shows the code still can't be compiled.

The reason I think, is the signature of async fn run<'c : 'a>(mut self, foo: &'c mut Foo) , which desugar to async fn run<'c : 'a>(mut this: Bar<'a>, foo: &'c mut Foo), because 'a is a named lifetime parameter of the function, meaning it outlives the run's body. The drop(self) can't stop this.

And because foo has been borrowed until end of 'a and 'a keeps active during the whole function body, foo.value can't be assigned

Look at the code in the last of my 3 replies above, I think it conclusively proves that covariance coercion would be enough to save the code, because all the first statement there does is convert the Bar<'a> to a Bar<'b> with a shorter lifetime 'b.

I cannot fully understand your reasoning as for why it shouldn't be enough, but perhaps you're making the mistake of assuming that the lifetimes 'a must be the same between the call to run and schedule. This isn't the case, and the run function can call the schedule function just fine with the shorter lifetime 'b in place of 'a, passing a &mut Bar<'b> and a &'b Foo parameter, and the lifetime 'b can then end at the place where the Bar<'b> value is dropped in the body of run, limiting the duration that the &'a mut Foo is re-borrowed for to the same place.

1 Like

&mut T is Invariant at T, you can't pass a Bar<'a> to Bar<'b>::schedule.

Edit:
Yes, you are right. Your 2nd code makes the code compiling and my code is still using invariant to refuse the code, although it's not Bar<'_> (or FuturesUnordered)

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.