I am using Futures 0.3 to create a stream and attempt to read one element from it. I have a problem compiling my code, where the error is about Unpin
not implemented for something.
Simplified example:
#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
use futures::{select, future, Future, stream, Stream, StreamExt};
use futures::executor::ThreadPool;
fn gen_select_fut() -> impl Future<Output=u32> {
let mut a = future::ready(0);
let mut b = future::ready(1);
async move {
select! {
a => a,
b => b,
}
}
}
fn gen_stream() -> impl Stream<Item=u32> {
stream::iter::<_>(0 .. 16)
.then(|_| gen_select_fut())
}
fn main() {
let mut thread_pool = ThreadPool::new().unwrap();
let my_stream = gen_stream();
// let res = thread_pool.run(gen_select_fut());
let res = thread_pool.run(my_stream.into_future());
}
My Cargo.toml:
[package]
name = "check_select_unpin"
version = "0.1.0"
authors = ["real"]
edition = "2018"
[dependencies]
futures-preview = "0.3.0-alpha.7"
Compliation error:
$ cargo run
Compiling check_select_unpin v0.1.0 (/home/real/temp/check_select_unpin)
error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:futures_util::future::ready::Ready<u32>, b:futures_util::future::ready::Ready<u32> for<'r, 's> {futures_util::future::poll_fn::PollFn<[closure@<::futures_util::async_await::select::select macros>:9:1: 16:43 a:&'r mut futures_util::future::ready::Ready<u32>, b:&'s mut futures_util::future::ready::Ready<u32>]>, ()}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
--> src/main.rs:25:41
|
25 | let res = thread_pool.run(my_stream.into_future());
| ^^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:futures_util::future::ready::Ready<u32>, b:futures_util::future::ready::Ready<u32> for<'r, 's> {futures_util::future::poll_fn::PollFn<[closure@<::futures_util::async_await::select::select macros>:9:1: 16:43 a:&'r mut futures_util::future::ready::Ready<u32>, b:&'s mut futures_util::future::ready::Ready<u32>]>, ()}]>`
|
= help: the following implementations were found:
<std::future::GenFuture<T> as std::marker::Unpin>
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<i32>>, impl core::future::future::Future, [closure@src/main.rs:18:15: 18:35]>`
= note: required because it appears within the type `impl futures_core::stream::Stream`
I have an idea of what could go wrong, but I'm not sure about it. Maybe the select! {}
macro creates a future that is not Unpin. I tried to verify it by modifying gen_select_fut()
signature to be:
fn gen_select_fut() -> impl Future<Output=u32> + std::marker::Unpin
In that case I get the following compilation error:
$ cargo run
Compiling check_select_unpin v0.1.0 (/home/real/temp/check_select_unpin)
error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:_, b:_ _]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
--> src/main.rs:5:24
|
5 | fn gen_select_fut() -> impl Future<Output=u32> + std::marker::Unpin {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:8:16: 13:6 a:_, b:_ _]>`
|
= help: the following implementations were found:
<std::future::GenFuture<T> as std::marker::Unpin>
= note: required because it appears within the type `impl core::future::future::Future`
= note: the return type of a function must have a statically known size
So I thought gen_select_fut()
is the source of the problem. But, interestingly enough, if I only run gen_select_fut()
using the thread_pool
, I don't get any compilation error. The following code compiles successfully:
#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
use futures::{select, future, Future, stream, Stream, StreamExt};
use futures::executor::ThreadPool;
fn gen_select_fut() -> impl Future<Output=u32> {
let mut a = future::ready(0);
let mut b = future::ready(1);
async move {
select! {
a => a,
b => b,
}
}
}
fn main() {
let mut thread_pool = ThreadPool::new().unwrap();
let res = thread_pool.run(gen_select_fut());
}
Another thing I tried was to implement my own select function for futures, possibly one that will behave better than the select!
macro. (Future 0.1 had such a function, but in Futures 0.3 it is gone?)
This is what it looks like:
fn my_select<T>(a: impl Future<Output=T> + Unpin,
b: impl Future<Output=T> + Unpin) -> impl Future<Output=T> {
let s_a = stream::once(a);
let s_b = stream::once(b);
let s = s_a.select(s_b);
s.into_future()
.map(|(opt_item, _s)| {
opt_item.unwrap()
})
}
Kind of hacky, but if I use it instead of the select!
macro I can make the code compile.
My questions are:
- What is going on here? I am sure there is a bigger explanation for why the resulting Stream is not Unpin.
- Should I "Pin" the Stream to make the code compile. If so, how should I do it?
- If the original
gen_select_fut()
outputs a Future that is not Unpin, how come I can compilethread_pool.run(gen_select_fut())
without a problem?