[Solved] Transforming a futures::stream to a Future with some transformations applied


#1

The compiler is being super vague, I’ve tried many variations on the code below. Basically hyper master has a function:

fn run_until<F>(self, shutdown_signal: F) -> Result<()> 
    where F: Future<Item=(), Error=Error>

I want to call that with a future that sleeps repeatedly, checking something everytime it wakes up, and if the check fails, return a value to the stream. I understand into_future is a good way to make taking a value from a stream be a future.

So this is what I came up with:

    fn run(action: Proc) -> NilClass {
        // ...
        let server = Http::new().bind(&addr, RubyHandler { action: Arc::new(action) }).expect("Could not bind");
        let timer = Timer::default();
        let ruby_done = timer.interval(Duration::from_millis(1)).filter_map(|_| {
          match itself.send("tick", vec!{})
            .try_convert_to::<Boolean>()
            .expect("Expected #tick to return a boolean")
            .to_bool() {
              false => Some(()),
              true => None
          }
        }).map_err(|_| hyper::Error::Timeout ).into_future();
        server.run_until(ruby_done);
        NilClass::new()
    }

So, the timer interval has Item’s of some kind, and Error’s of some kind. I filter_map over that to only produce Items whenever my predicate is true, the Item value being (). I follow that up with a map_err to make sure Error is of the type hyper::Error.

Am I expressing that correctly?

The compiler error I get is as following. Note that the compiler has the expect/got types switched for some reason, that seems to be a compiler bug. Besides that it’s not clear to me what is wrong:

error[E0271]: type mismatch resolving
<futures::stream::StreamFuture<futures::stream::MapErr<futures::stream::FilterMap<tokio_timer::Interval, [closure@src/lib.rs:182:77: 190:10 itself:_]>, [closure@src/lib.rs:190:20: 190:45]>> as futures::Future>::Item == ()
–> src/lib.rs:191:16
|
191 | server.run_until(ruby_done);
| ^^^^^^^^^ expected tuple, found ()
|
= note: expected type (std::option::Option<()>, futures::stream::MapErr<futures::stream::FilterMap<tokio_timer::Interval, [closure@src/lib.rs:182:77: 190:10 itself:_]>, [closure@src/lib.rs:190:20: 190:45]>)
found type ()

error[E0271]: type mismatch resolving <futures::stream::StreamFuture<futures::stream::MapErr<futures::stream::FilterMap<tokio_timer::Interval, [closure@src/lib.rs:182:77: 190:10 itself:_]>, [closure@src/lib.rs:190:20: 190:45]>> as futures::Future>::Error == hyper::Error
–> src/lib.rs:191:16
|
191 | server.run_until(ruby_done);
| ^^^^^^^^^ expected tuple, found enum hyper::Error
|
= note: expected type (_, futures::stream::MapErr<futures::stream::FilterMap<tokio_timer::Interval, [closure@src/lib.rs:182:77: 190:10 itself:_]>, [closure@src/lib.rs:190:20: 190:45]>)
found type hyper::Error

error: aborting due to 2 previous errors

Does that make sense? What does it want?


#2

Not sure if this is the only problem, but one that I spotted is this: calling into_future() creates a StreamFuture, which has:

type Item = (Option<S::Item>, S)
type Error = (S::Error, S)

This means ruby_done implements Future<Item=(Option<…>, …), Error=(…, …)>. This clearly does not match what run_until wants, which is Future<Item=(), Error=Error>.


#3

Ahh thanks so much, that was it. I really was blind to that… maybe it was just my fault but it seemed like the compiler was really confused about this.


#4

It’s not so much that the compiler is confused, it’s just that these huge types make the error message very verbose and hard to read. Hopefully the incoming impl Trait feature will make these situations less common.