Help creating a future factory

Hi all,

I'm new to learning Rust and I'm having some issues writing a future factory.

I'm trying to create a future factory that will store the last created future for it to be able to be polled. For this, I'm not able to use dynamic dispatch due to requirements; though, I've previously had a working solution using it. The general flow of what I'm trying to do is:

  1. Initialise the future factory with some provided arguments that it uses to create new futures.
  2. When required, poll the factory's latest future
    3.1. If the future returns OK, then finish.
    3.2. If it fails, create a new future and set the factory's latest to this instance. Outside of what I've posted here, it will do some other work to retry it in a different way.

Calling the factory to create a new instance if the future fails is done externally to all of this and that system is working fine with the dynamic dispatch approach. The problem that I'm having here is doing this without dynamic dispatch - the reason for this is that it's a super hot area in the code.

enum FutureError {}

#[pin_project]
pub struct Factory<F, Func> {
    func: Func,
    #[pin]
    s: Wrapper<F>,
}

impl<F, Func> Future for Factory<F, Func>
    where F: Future<Output=Result<(), FutureError>>
{
    type Output = Result<(), FutureError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.project().s.poll(cx)
    }
}

impl<F> Future for Wrapper<F>
    where F: Future<Output=Result<(), FutureError>>
{
    type Output = Result<(), FutureError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.project().fut.poll(cx)
    }
}

#[pin_project]
pub struct Wrapper<F> {
    #[pin]
    fut: F,
}

impl<F> Wrapper<F> {
    pub fn new(f: F) -> Self {
        Wrapper { fut: f }
    }
}

impl<F, Func> Factory<F, Func>
    where
        F: Future<Output=Result<(), FutureError>>,
        Func: Fn() -> F,
{
    pub fn wrap() -> Factory<F, Func> {
        let func = || get_future();
        let f = func();

        Factory {
            func,
            s: Wrapper::new(f),
        }
    }
}

async fn get_future() -> Result<(), FutureError> {
    // Dore more interesting stuff but that's not important
    Ok(())
}

The error that I'm receiving with my last iteration on this is:

    error[E0308]: mismatched types
--> src/second.rs:49:13
|
39 | impl<F, Func> Factory<F, Func>
|         ---- this type parameter
...
45 |         let func = || get_future();
|                    --------------- the found closure
...
49 |             func,
|             ^^^^ expected type parameter `Func`, found closure
...
55 | async fn get_future() -> Result<(),FutureError> {
|                          ---------------------- the `Output` of this `async fn`'s found opaque type
|
= note: expected type parameter `Func`
                    found closure `[closure@src/second.rs:45:20: 45:35]`

I've tried using fn, Fns and other versions but I cannot see a solution to this. This last iteration is only using Fns are I thought I got close to a solution. I'm not sure how to do it with or without them!

Any help would be greatly appreciated!

The async fn will return an opaque type, that is a type that cannot be named. You can work around this by manually creating a struct that implements Future and then returning that from get_future.

Something like this playground.

@najamelan Your playground link works, but I don't think it's related to how async fn returns a dynamic future?

If I replace get_future in it again with an async fn, it works fine too: playground.

I think the problem in the original post is that wrap is generic over the return type. Specifically, by declaring generics <F, Func> you are explicitly saying "whoever calls wrap gets to choose what F and Func are". But then you go on and supply your own function. What if the caller chose a different type for Func? Then the struct would be invalid. This is what rust is complaining about.

One solution to this is to have the wrap function take in a function, and place that function unchanged inside the struct. This way you're actually using the generic type, and it works. @najamelan's playground uses this solution.

Another general would be to return a type which the impl Trait return type to return something which is opaque, but not caller-chosen. However, since your type involves two type parameters which depend on eachother, I don't think this is possible.

Thanks for the replies! I'll take a proper look at them tomorrow. Is it possible to do this with chained futures too? I.e, the factory returns futA.and_then(|r| futB(r))? Or the factory uses the closure to create a future and then chains it with another one and then stores it in the struct.

Yes, all the combinators in futures-rs return specific types. That does mean that the nesting generics become quite unwieldy.

In principle since the stabilization of async/await, you don't need much combinators anymore, and I feel the code becomes much nicer for it:

async
{
   futa.await;
   futb.await;
}

is basically a chained future. Now if you need to store it, it's another piece of cake. Where the combinator would basically give you some type like:

Then<FutA, FutB>

The async block is of an unnamable type. You can only store it as a Pin<Box<dyn Future<Output=()> + Send>>, or in a local variable that does not need type annotations. This can be fine in many cases, but sometimes it's not.

Another way to look at it is through JoinHandle<T>. It's a future that tracks the completion of a spawned task. Here T is the output of the future. That means that all JoinHandle<SomeType> are the same type, even though you could have spawned async fn, async blocks or other types of futures, as long as they return the same Future::Output.

I wrote two libraries that build on this, async_executors, which exposes an executor agnostic JoinHandle and async_nursery which uses this to store them in a FuturesUnordered, as this last one is generic over the type of future you store in it, it really comes to power when you store JoinHandles, because otherwise you couldn't store an async block or async fn in it as they would all be different unnamable types.

@najamelan, thanks for the advice! I've taken a look through the libraries that you've linked and they've given me some ideas on how to do this. But I'm still having some issues ordering it and working out exactly how to structure the code. Would you be able to provide some further advice or something on the Rust Playground that would help? I'm struggling to see exactly how to write the types/structure for both the factory and storing/polling the future that is returned.

What I'd like to be able to write is something like:

  • Have some function that returns a chained future.
  • Store the chained future.
  • Poll this chained future and if it errors, start the process again.

I don't mind to write some example code, but it's hard to understand what you are trying to achieve exactly from your description.

as for the points you mention:

Any future that will await or poll several inner futures will be a chained future, so this shouldn't in itself provide an obstacle.

This means it has to be a nameable type. It can be:

  • a custom type that you implement and polls inner futures in Future::poll.
  • A type from the futures lib like Then
  • a Pin<Box<dyn Future>>

Thus if it's just async fn or async {} you have to box it.

You can await it and it should return a Result. You match on the result and if it's an error, start over.

@najamelan, this is the rough flow of what I'm trying to do: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=1bcb33f987fcad32100732a605f14a61. It doesn't compile, but the flow is there. Hopefully, that is clearer! Folowing getting this working, the Fn will have arguments and they'll be passed to the first future (func1). Will that cause any issues that you're aware of? As I said previously, I'm trying to do this without dynamic dispatch.

There is an attempts field and this will be used to only retry the future a number of times.

It'd be nice if any function could be passed in to the factory and for the process to be the same for the poll function. From what I understand, the chained futures all have to return the same type for it to work? Or is that a misunderstanding?

From what I understand:

You want a type of future that when polled:

  • polls an inner future
  • if that returns an error, disposes of it and creates a new one
  • do that for attempt times
  • be flexible over the type of inner future

aka a general retry mechanism?

The crux of what cannot work in what you try to do is use async fn but no dynamic dispatch. The types are un-namable and cannot go into a generic F, because each invocation returns a distinct un-namable type. As you want to re-create the future after the last one fails, it will be a different type, so a different F.

Either you work with specific types of futures, or you box them. I suppose the most idiomatic way of doing this is to just require F: Default. Then your FutureWrapper can just call F::default() to make a new one and you don't need a factory.

Alternatively you can take a function pointer to a function that takes no input parameters. You could get around the no input requirement with TLS or globals, but I suggest you don't go there. It quickly has footguns and it has overhead as well.

If it needs input other than type information, you need a boxed closure (closures are also un-nameable) as a factory, which again means you need to allocate.

On nightly, you can get round the allocation with unsized locals), but not around the dynamic dispatch.

So I think it's really not possible to do what you try to do, unless your futures are nameable types and they require no input to be created.

As a final note, are you really certain that the overhead of dynamic dispatch is unacceptable here? Don't optimize it prematurely. If it would be conceivable for anyone to write the software you are writing in anything other than assembler or C, it's probably not the case.

Yes, a retry mechanism. As there is the restriction of not using dynamic dispatch it is OK for them to be concrete types and not use generics. Would you be able to give an example of doing it using the boxed/non-generic approach?

I did fear that this may be too constrained. But as this is a core part of a highly intensive application there is the restriction in this area to try and not use it. I completely agree with you as to not prematurely optimise the application!

I appreciate your help!

I don't entirely understand. If they are concrete types, they don't need to be boxed and they can have generics.

However if it's boxed dyn trait, it will always use dynamic dispatch + heap allocation.

Sorry, I think I misunderstood what you initially meant as using Box to get around the dynamic dispatch.

What do you see as the solution to this problem? It doesn't matter if this is a one-off non-reuseable part of the code. As long as there is some retry system that doesn't use dynamic dispatch for these futures.

If you want to use async fn, I would write out the retry-loop manually and then see step by step if you can abstract anything out to make it more convenient, but I don't think you'll be able to abstract much without dynamic dispatch.

On the other hand, if you make concrete types of your futures, it's completely possible.

You could even make a future that you can just keep polling after it has resolved. If it runs into an error, let it reset it's internal state. Now the caller keeps polling it for attempt times or until it returns success.

I have looked at implementing a concrete type of the future, but the problem that arose with that approach was that inside the future several other async calls were made inside the future which is why I've been looking at alternative approaches. The nested async calls have been another part of the problem which has led to the use of generics.

Really constrained at all angles here it seems! I haven't been sure if there's a gap in my knowledge with futures or that this just isn't easily solvable.

I believe there's another option here: if the thing storing the type is generic, you are completely free to instantiate that generic parameter with an unnameable type. Sure, the resulting type is also unnamable, but as long as you don't need to store that, it's still OK.

For instance, it's 100% valid to store unnameable closures in a generic struct:

struct UsesClosure<F> {
    retry: F,
}
impl<F: FnMut() -> u32> UsesClosure<F> {
    fn new(retry: F) -> Self {
        UsesClosure { retry }
    }
    fn get(&mut self) -> u32 {
        (self.retry)()
    }
}

fn main() {
    let mut var = 0;
    // note: do to captured variables, this is unnameable
    let closure = || {
        var += 1;
        var
    };
    // the type `UsesClosure<???>` is _also_ now unnameable. But despite that,
    // we can still use it!
    let mut v = UsesClosure::new(closure);
    println!("{}", v.get());
    println!("{}", v.get());
}

That's a simpler example, but the same logic applies to async functions. We can totally store them as long as we're willing to let the resulting type also be unnameable.

Here are two possibilities I see for making this compile, using the strategy above (using a generic to get rid of the "unnameable" problem):

I also had to modify the trait implementation slightly, adding as_mut() to get it to compile.

Do either of these solutions work for your use case?

Edit: added simple example of an struct containing a generic storing an unnameable type

1 Like

@daboross, thank you for this! It looks promising. I'll have a look at integrating it like that later on and get back to you.

Regarding your closure/async unnameable comments. I believe that is one of the things that I have been having issues with. I think I just need to wrap some of the async fns/closures as you have demonstrated and then store those - fingers crossed this will resolve the issue that I have been having. I didn't try it yesterday with the closure as I found the compiler message a bit confusing. I'm hoping that once I integrate your suggestions that I won't have any issues with passing arguments to the closure, as these are required to set up the async functions once again.

Really appreciate your comments and playground links!

1 Like

A follow up question, for your second playground link, I'm slightly confused by this:

// provide dummy generics here so that rustc doesn't have to infer them
// (as we don't use them in the constructor)
impl FutureWrapper<(), ()> {
    pub fn new() -> impl Future {
        let factory = || func1().then(|_| func2());

Why exactly are the types not inferred from the closure? This was one of the things that was confusing me. In the future implementation, the following is written:

F: Future<Output = Result<O, E>>,
Fac: Fn() -> F,

I also added this to the FutureWrapper implementation and the compiler was still unable to resolve it even when I used the closure and stored the result in the state. To me, it's quite clear that the function returns something that implements a future. How exactly does adding () resolve the issue, surely that says that both the closure and future are ()?

How should the compiler know that the generic parameters have any relation to what new returns? Typically you tell it that this is the case by using either Self or the generic parameters directly in the return value, but if you don't have such a return value, there's nothing to tie them together.

One thing to realize is that the generic parameters are parameters.

impl<A, B> FutureWrapper<A, B> {
    pub fn new() -> Self { ... }
}

In this example, A and B are chosen by the user. I could choose that I wanted A = u32 and B = String and call FutureWrapper::<u32, String>::new, and that has to be valid, because those types are allowed by the constraints on the type.

Similarly with your constraints, the constructor should allow F and Fac to be any type that satisfies your constraints. We don't want that. We want to use the specific types of the closures and async functions in questions, and allow only those.

1 Like

The main reason is that the new function's signature doesn't use FutureWrapper at all. It's essentially a freestanding function which is attached to FutureWrapper for convenience.

The reason for my doing this is related to why there aren't generics: because generics mean the caller gets to choose the type. If we declare <F, Fac> then the caller is always allowed to choose any matching F and Fac. As @alice mentioned, we don't want this, because we want to choose the types (F should be whatever func1().then(|_| func2()) is, and then Fac should be closure wrapping that).

One possible point of confusion here is that we use FutureWrapper inside the new function. This is completely unrelated to the FutureWrapper<(), ()> used above. It's allowed because we're always allowed to use different generic instantiations of the same type! For instance, the only thing in my first link which ties these two types together is that new is declared as:

    pub fn new(factory: Fac) -> Self {

Fac here uses the generic parameter, and critically, Self refers to the outer FutureWrapper<F, Fac>. Since we never use Self in the new function returning an impl Future, it's just never used.


If that makes sense, then I think it should make sense why the compiler couldn't infer F and Fac if you reintroduce them. We never use them (either directly by declaring something of type Fac, or indirectly by using Self), so type inference has no clues about what they should be.

@alice, @daboross thank you for the explanations, this makes much more sense now. I'll remember to bare the points that you have mentioned in mind from now on.

Yes, this was most definitely confusing and I was wondering why they aren't being inferred. Adding this in looks promising.

In my many iterations of this problem, I had something that compiled but the types outside couldn't be inferred - caused by exactly what you have said. I just need to figure out exactly how to express this to the compiler.

On these points, would either of you have any recommended reading on these topics?

1 Like