The trait Unpin is not implemented for `std::future::GenFuture`

#1

I am using Futures 0.3 to create a stream and attempt to read one element from it. I have a problem compiling my code, where the error is about Unpin not implemented for something.
Simplified example:

#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
use futures::{select, future, Future, stream, Stream, StreamExt};
use futures::executor::ThreadPool;

fn gen_select_fut() -> impl Future<Output=u32> {
    let mut a = future::ready(0);
    let mut b = future::ready(1);
    async move {
        select! {
            a => a,
            b => b,
        }
    }
}

fn gen_stream() -> impl Stream<Item=u32> {
    stream::iter::<_>(0 .. 16)
        .then(|_| gen_select_fut())
}

fn main() {
    let mut thread_pool = ThreadPool::new().unwrap();
    let my_stream = gen_stream();
    // let res = thread_pool.run(gen_select_fut());
    let res = thread_pool.run(my_stream.into_future());
}

My Cargo.toml:

[package]
name = "check_select_unpin"
version = "0.1.0"
authors = ["real"]
edition = "2018"

[dependencies]

futures-preview = "0.3.0-alpha.7"

Compliation error:

$ cargo run
   Compiling check_select_unpin v0.1.0 (/home/real/temp/check_select_unpin)                                                                                                                                        
error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:futures_util::future::ready::Ready<u32>, b:futures_util::future::ready::Ready<u32> for<'r, 's> {futures_util::future::poll_fn::PollFn<[closure@<::futures_util::async_await::select::select macros>:9:1: 16:43 a:&'r mut futures_util::future::ready::Ready<u32>, b:&'s mut futures_util::future::ready::Ready<u32>]>, ()}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
  --> src/main.rs:25:41                                                                                                                                                                                            
   |                                                                                                                                                                                                               
25 |     let res = thread_pool.run(my_stream.into_future());                                                                                                                                                       
   |                                         ^^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:futures_util::future::ready::Ready<u32>, b:futures_util::future::ready::Ready<u32> for<'r, 's> {futures_util::future::poll_fn::PollFn<[closure@<::futures_util::async_await::select::select macros>:9:1: 16:43 a:&'r mut futures_util::future::ready::Ready<u32>, b:&'s mut futures_util::future::ready::Ready<u32>]>, ()}]>`
   |                                                                                                                                                                                                               
   = help: the following implementations were found:                                                                                                                                                               
             <std::future::GenFuture<T> as std::marker::Unpin>                                                                                                                                                     
   = note: required because it appears within the type `impl core::future::future::Future`                                                                                                                         
   = note: required because it appears within the type `impl core::future::future::Future`                                                                                                                         
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<i32>>, impl core::future::future::Future, [closure@src/main.rs:18:15: 18:35]>`
   = note: required because it appears within the type `impl futures_core::stream::Stream`             

I have an idea of what could go wrong, but I’m not sure about it. Maybe the select! {} macro creates a future that is not Unpin. I tried to verify it by modifying gen_select_fut() signature to be:

fn gen_select_fut() -> impl Future<Output=u32> + std::marker::Unpin

In that case I get the following compilation error:

$ cargo run
   Compiling check_select_unpin v0.1.0 (/home/real/temp/check_select_unpin)                                                                                                                                        
error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:_, b:_ _]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`                           
 --> src/main.rs:5:24                                                                                                                                                                                              
  |                                                                                                                                                                                                                
5 | fn gen_select_fut() -> impl Future<Output=u32> + std::marker::Unpin {                                                                                                                                          
  |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:_, b:_ _]>`
  |                                                                                                                                                                                                                
  = help: the following implementations were found:                                                                                                                                                                
            <std::future::GenFuture<T> as std::marker::Unpin>                                                                                                                                                      
  = note: required because it appears within the type `impl core::future::future::Future`                                                                                                                          
  = note: the return type of a function must have a statically known size                        

So I thought gen_select_fut() is the source of the problem. But, interestingly enough, if I only run gen_select_fut() using the thread_pool, I don’t get any compilation error. The following code compiles successfully:

#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
use futures::{select, future, Future, stream, Stream, StreamExt};
use futures::executor::ThreadPool;

fn gen_select_fut() -> impl Future<Output=u32> {
    let mut a = future::ready(0);
    let mut b = future::ready(1);
    async move {
        select! {
            a => a,
            b => b,
        }
    }
}

fn main() {
    let mut thread_pool = ThreadPool::new().unwrap();
    let res = thread_pool.run(gen_select_fut());
}

Another thing I tried was to implement my own select function for futures, possibly one that will behave better than the select! macro. (Future 0.1 had such a function, but in Futures 0.3 it is gone?)

This is what it looks like:

fn my_select<T>(a: impl Future<Output=T> + Unpin,
                      b: impl Future<Output=T> + Unpin) -> impl Future<Output=T> {

    let s_a = stream::once(a);
    let s_b = stream::once(b);
    let s = s_a.select(s_b);
    s.into_future()
        .map(|(opt_item, _s)| {
            opt_item.unwrap()
        })
}

Kind of hacky, but if I use it instead of the select! macro I can make the code compile.

My questions are:

  1. What is going on here? I am sure there is a bigger explanation for why the resulting Stream is not Unpin.
  2. Should I “Pin” the Stream to make the code compile. If so, how should I do it?
  3. If the original gen_select_fut() outputs a Future that is not Unpin, how come I can compile thread_pool.run(gen_select_fut()) without a problem?
#2

There’s a few things going on here.

First, the error implies that there’s some form of GenFuture that can implement Unpin. This is incorrect and confusing (I had the same thought you did). GenFuture itself is not Unpin: link (the bounds on T are required because GenFuture has the same constraints).

In your specific case, the pinning is occurring only in the my_stream.into_future line because into_future pins its future: link (where unsafe_pinned is link).

#3

@Nashenas88: Thanks for coming up with an answer, even two months later!
Interesting point about into_future pinning its future.

I kept writing code since this question, and I used select! more than once since then. For some reason I didn’t meet those problems again, I’m not sure why. Maybe something changed in the implementation of select!, or maybe the way I write code somehow changed (Maybe even without being aware of it).

I have to admit there are still things I don’t fully understand with the pinning. Most of the time when I see someone pinning something (For example, inside the Futures 0.3 code) it is done using some unsafe {} clause. Is there a way to do pinning correctly without having unsafe in my code?

I also used Box::pinned in a place or two because I couldn’t understand how to make the compiler happy. It really troubled me in the beginning, but then I said to myself that I will just let myself be a beginner for a while with this new concept, and I will come back to this code later and fix it in the future, hopefully when I’m a bit smarter.

#4

Unfortunately I don’t know the answers to your current questions. I stumbled upon your post while running into the same error in my own conversation to async/await. I answered it once I solved my problem and understood what the restrictions were (in my case I had to convert a trait to return a FunctionObj, which only exists in the futures-preview crate, and I was still on 0.1 futures with the futures crate since I didn’t realize it was republished under a new name :sweat_smile:). I always try to answer those never answered posts if I can figure it out so the person after me also doesn’t have to keep searching.