Reading just one item from a Stream: Compiling problems


#1

Hi, I am working through the official Rust Tokio tutorials. Specifically, this one: https://tokio.rs/docs/going-deeper-tokio/handshake/
This tutorial talks about creating an echo server that has a handshake in the beginning of the connection. The handshake is added at the protocol level, which is nice, because the service module doesn’t need to know about it happening.

This is the relevant part of the code from the tutorial:

fn bind_transport(&self, io: T) -> Self::BindTransport {
        // Construct the line-based transport
        let transport = io.framed(LineCodec);

        // The handshake requires that the client sends `You ready?`,
        // so wait to receive that line. If anything else is sent,
        // error out the connection
        Box::new(transport.into_future()
            // If the transport errors out, we don't care about
            // the transport anymore, so just keep the error
            .map_err(|(e, _)| e)
            .and_then(|(line, transport)| {
                // A line has been received, check to see if it
                // is the handshake
                match line {
                    Some(ref msg) if msg == "You ready?" => {
                        println!("SERVER: received client handshake");
                        // Send back the acknowledgement
                        let ret = transport.send("Bring it!".into());
                        Box::new(ret) as Self::BindTransport
                    }
                    _ => {
                        // The client sent an unexpected handshake,
                        // error out the connection
                        println!("SERVER: client handshake INVALID");
                        let err = io::Error::new(io::ErrorKind::Other,
                                                 "invalid handshake");
                        let ret = future::err(err);
                        Box::new(ret) as Self::BindTransport
                    }
                }
            }))
    }

The code above works as follows: The server waits for the message “You ready?” from the client, and then replies with “Bring it!”. Only then the echo service begins working.

As an exercise I want to change that code so that the server will be the first one sending the message “You ready?”, and the client will have to reply “Bring it!”. I started rewriting the code but I couldn’t get it to compile. Here is the code:

fn bind_transport(&self, io: T) -> Self::BindTransport {
    // let transport = io.framed(LineCodec);
    let transport = io.framed(LineCodec);

    let fut = transport
        .send("You ready?".into())
        .and_then(|transport| {
            Ok(transport.into_future())
        })
        .and_then(|(line, transport)| {
            match line {
                Some(ref msg) if msg == "Bring it!" => {
                    println!("SERVER: received client handshake");
                    // let ret = transport.send("Bring it!".into());
                    Box::new(future::ok(transport)) as Self::BindTransport
                }
                _ => {
                    println!("SERVER: client handshare INVALID");
                    let err = io::Error::new(io::ErrorKind::Other,
                                             "invalid handshake");
                    let ret = future::err(err);
                    Box::new(ret) as Self::BindTransport
                }
            }
        });

    Box::new(fut)
}

Here are the main compile problems:

$ cargo check
   Compiling conn_handshake v0.1.0 (file:///home/real/projects/learn_tokio/conn_handshake)

error[E0308]: mismatched types
  --> src/line_proto.rs:36:24
   |
36 |             .and_then(|(line, transport)| {
   |                        ^^^^^^^^^^^^^^^^^ expected struct `line_proto::futures::stream::StreamFuture`, found tuple
   |
   = note: expected type `line_proto::futures::stream::StreamFuture<line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>>`
              found type `(_, _)`

error: aborting due to previous error

error: Could not compile `conn_handshake`.

I’m trying to read a single string message from the transport, and then wait for incoming message, verify that it equals “Bring it!” and then hand over the transport.

I read about into_future, and I understand that it turns a Stream into (Option, Stream), so I should be able to read the item and still have a stream to work with later. However, it seems like instead I’m getting a StreamFuture type, and I make it turn into a tuple of an Item and a Stream.

It does seem to work well in the original code, I can’t find what I’m missing here.

I will appreciate any ideas of how to solve this.

The full code is here:https://github.com/realcr/learn_tokio/tree/f7678513b573f990ed767bd1b747830dc38ebc52/conn_handshake

EDIT: Permanent link to full code (Pinned to commit number).


#2

It’s because of this bit in your code:

.and_then(|transport| {
            Ok(transport.into_future())
        })

Note that you’re returning a Result by virtue of the Ok(...). There’s an IntoFuture impl for Result like so:

impl<T, E> IntoFuture for result::Result<T, E> {
    type Future = FutureResult<T, E>;
    type Item = T;
    type Error = E;

    fn into_future(self) -> FutureResult<T, E> {
        result(self)
    }
}

and_then requires that you give it a FnOnce(Self::Item) -> ..., and Self::Item here is the StreamFuture<...> that you got from transport.into_future(). So, you don’t actually have a StreamFuture where you think you do, but rather a FutureResult that contains a StreamFuture.


#3

Thank you for the reply @vitalyd.

I’m still confused. I think that you explanation could apply to the original example code, where everything compiles correctly.

I tried another modification:

let fut = transport
    .send("You ready?".into())
    .into_future()
    .and_then(|(line, transport)| {
        match line {
            Some(ref msg) if msg == "Bring it!" => {
                println!("SERVER: received client handshake");
                // let ret = transport.send("Bring it!".into());
                Box::new(future::ok(transport)) as Self::BindTransport
            }
            _ => {
                println!("SERVER: client handshare INVALID");
                let err = io::Error::new(io::ErrorKind::Other,
                                         "invalid handshake");
                let ret = future::err(err);
                Box::new(ret) as Self::BindTransport
            }
        }
    });

This time I get this error:

error[E0308]: mismatched types
  --> src/line_proto.rs:39:24
   |
39 |             .and_then(|(line, transport)| {
   |                        ^^^^^^^^^^^^^^^^^ expected struct `line_codec::tokio_io::codec::Framed`, found tuple
   |
   = note: expected type `line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>`
              found type `(_, _)`

error: aborting due to previous error

#4

Let’s look at the types involved in:

let fut = transport
    .send("You ready?".into())
    .into_future()

transport starts out as a Framed<...>, which is a Stream. into_future on a Stream returns a StreamFuture. A StreamFuture's Item associated type is the one with the tuple. So before you can call and_then that wants a tuple, you need to make sure you’re working with a StreamFuture specifically, and not some other type of Future impl.

transport.send(...) above returns a https://docs.rs/futures/0.1.14/futures/sink/struct.Send.html. Note that this struct implements Future, but its Item associated type is not the tuple. Your into_future() on it is essentially a no-op.

So you probably want to do transport.into_future() first to get a StreamFuture and then chain continuations onto that, or something like that. But basically, you need a StreamFuture for the and_then you’re looking for, AFAICT.

Note how the docs example you’re modeling after pretty much does that - it calls transport.into_future() first (gets the StreamFuture), then does a map_err call (which preserves the StreamFuture type), and then calls and_then - since it’s a StreamFuture, that works.


#5

If I try to do transport.into_future() first, I will not be able to use the send method. For example:

let fut = transport
    .into_future()
    .send("You ready?".into())

Will give the compile error:

error[E0599]: no method named `send` found for type `line_proto::futures::stream::StreamFuture<line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>>` in the current scope
  --> src/line_proto.rs:31:14
   |
31 |             .send("You ready?".into())
   |              ^^^^
   |
   = note: the method `send` exists but the following trait bounds were not satisfied:
           `line_proto::futures::stream::StreamFuture<line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>> : line_proto::futures::Sink`

And I can understand why. Using into_future() over transport allows to read one message, and turns it into StreamFuture. I need some form of a Sink if I want to be able to send a message.

Yes, but they read a message first and then send a message. I want to first send a message and then read a message.

@vitalyd: I hope that I understood everything that you wrote correctly. Do you have other ideas? I was trying to perform strange mutations over my code for a whole day now, though I can’t make it work.


#6

Right, I was just trying to explain the compilation error and the difference from the tokio docs.

So if you just want to send a message and then chain a continuation onto it when it resolves, I think you should just work with the Send struct that’s returned from transport.send() - this is a Future that resolves once the message has been sent. The Item associated type of Send is the Framed value itself, which is the transport you started with. So, you can probably tack on a continuation to that which initiates reading of a response. Maybe something like this:

transport.send("You ready?".into())
         .map(|transport| transport.into_future()) // transform to a StreamFuture
         .and_then(|(line, transport) ...) // now work with the StreamFuture pretty much as the tokio docs

The above is totally untested, but I think it should be something like that.


#7

@vitalyd, thanks for the time you took to reply again.
I tried your suggestion, and it also doesn’t compile. I’m not on my work computer now so I will send here later the compile errors.

In addition, I think that map is not the right thing to do here. If I understand correctly, map does some operation over all the items in a Stream, and this is not what I want. I just want to read the first item in the stream, and leave the rest of the stream untouched, because it is used for the rest of the protocol.


#8

Sure, let’s try to iterate on this.

The map here was against a single Future (the Send struct) - it allows you to convert the item returned by the future to something else. That something else here is to turn it into a StreamFuture since Send resolves to the Framed struct itself (your transport binding).


#9

Hi @vitalyd, good morning!
I tried the code snippet as suggested:

transport.send("You ready?".into())
         .map(|transport| transport.into_future()) // transform to a StreamFuture
         .and_then(|(line, transport) ...) // now work with the StreamFuture pretty much as the tokio docs

I get this compile error:

error[E0308]: mismatched types
  --> src/line_proto.rs:45:24
   |
45 |             .and_then(|(line, transport)| {
   |                        ^^^^^^^^^^^^^^^^^ expected struct `line_proto::futures::stream::StreamFuture`, found tuple
   |
   = note: expected type `line_proto::futures::stream::StreamFuture<line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>>`
              found type `(_, _)`

This is strange. I still can’t manage to split the transport into a line and a transport. What do you think?
I should probably create a very simple program that demonstrates this problem.


#10

Yeah, sorry - the snippet I gave is incorrect because, once again, we don’t yet have the StreamFuture but rather a wrapper around it.

Try the following:

transport.send("You ready?".into())
         .map(|transport| transport.into_future())
         .and_then(|transport| transport.map_err(|(e, _)| e)
                                  .and_then(|(line, transport)| { ...}))

Basically, we get access to the StreamFuture inside the “outer” and_then.


#11

@vitalyd: It worked! Thank you so much. I have no idea how long it would have taken me to make the code compile on my own.

I still don’t understand two things though:

  1. What is going on with the first map, and why isn’t it and_then?

  2. The map_err thing is very surprising. First, I didn’t realise that it should be nested inside an “and_then” clause. Next, without it I get really strange compile messages. I could have never guessed that map_err is missing. Here is an example for such a compile error. For the code (removed the map_err):

transport.send("You ready?".into())
         .map(|transport| transport.into_future())
         .and_then(|transport| transport)
         .and_then(|(line, transport)| { ...}))

I get this compile error:

error[E0271]: type mismatch resolving `<line_proto::futures::stream::StreamFuture<line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>> as line_proto::futures::IntoFuture>::Error == std::io::Error`
  --> src/line_proto.rs:32:14
   |
32 |             .and_then(|transport| transport)
   |              ^^^^^^^^ expected tuple, found struct `std::io::Error`
   |
   = note: expected type `(std::io::Error, line_codec::tokio_io::codec::Framed<T, line_codec::LineCodec>)`
              found type `std::io::Error`

How could I infer from this error message that I should have done an extra map_err?

I tried to write a simple and self contained version of this problem, just to make sure I can get it right. It might be helpful for the next readers of it. This code generates a dummy stream, takes the first element using into_future, and then prints the rest of the stream. Here it is:

#![feature(conservative_impl_trait)]

extern crate tokio_core;
extern crate futures;

use std::fmt;

use tokio_core::reactor::Core;
use futures::{Stream, Future};
use futures::{stream};

/// Generate a stream of numbers from 0 .. x
fn gen_stream(x: usize) -> impl Stream<Item=usize, Error=()> {
    let my_stream = stream::iter::<_,usize,()>(
        (0 .. x).map(|x| Ok(x))
    );
    my_stream
}

/// A future for printing a whole stream.
fn print_stream<S,T,E>(stream: S) -> impl Future<Item=(), Error=E>
    where S: Stream<Item=T, Error=E>,
          T: fmt::Display {

    stream.for_each(|x| {
        // Print each element:
        print!("{} ", x);
        Ok(())
    }).and_then(|_| {
        // After all elements were printed, we print a newline:
        println!();
        Ok(())
    })
}

fn main() {

    let mut core = Core::new().unwrap();
    let my_stream = gen_stream(100);

    let my_future = my_stream
        .into_future()
        .and_then(|(opt_num, my_stream)| {
            match opt_num {
                Some(num) => println!("num = {}", num),
                None => println!("No elements to read!"),
            };
            Ok(my_stream)
        }).map_err(|(e, _)| e) 
        .and_then(|my_stream| {
            print_stream(my_stream)
        });

    core.run(my_future).unwrap()
}

Output:

$ cargo run
   Compiling stream_games v0.1.0 (file:///home/real/projects/learn_tokio/stream_games)
    Finished dev [unoptimized + debuginfo] target(s) in 1.51 secs
     Running `target/debug/stream_games`
num = 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 

#12

map is a transformation combinator - it doesn’t initiate any async action, but simply transforms an already resolved result (future) to something else. and_then is to spawn additional async actions, rather than perform a simple transformation.

Yeah, there’s quite a bit of type soup going on here. The gist is that the StreamFuture::Error associated type is a tuple: type Error = (S::Error, S) (S here is the underlying Stream type). However, we’re saying that we return a BindTransport type (that’s from the function signature fn bind_transport(&self, io: T) -> Self::BindTransport).

BindTransport is, in turn, a type alias: type BindTransport = Box<Future<Item = Self::Transport, Error = io::Error>>;. Crucially, note that the error type is std::io::Error. So we end up with a type mismatch: we want to return a simple std::io::Error but the StreamFuture we’re applying combinators to has a different error type, namely the tuple. So, map_err is used to, effectively, transform the error type from (S::Error, S) to just S::Error (i.e. we “discard” the S and therefore discard the tuple, leaving us with just S::Error). The S::Error, if you trace back through the combinators is the std:io::Error that LineCodec returns (as part of its Encoder and Decoder impls).

This is a bit hairy, so let me know if I can try to clear something up.


#13

I think that I understand it now.
To prove it, I transformed the code provided:

transport.send("You ready?".into())
         .map(|transport| transport.into_future())
         .and_then(|transport| transport.map_err(|(e, _)| e)
         .and_then(|(line, transport)| { ... })

Into an equivalent compiling version:

transport.send("You ready?".into())
            .and_then(|transport| transport.into_future().map_err(|(e,_)| e))
            .and_then(|(line, transport)| { ... })

Thank you for the patience in explaining everything!
real.