Sized is not implemented for Future

OK, I've attempted to implement using 2 channels to send/recv from my RPCClient... I'm pretty close, but a lot of this is just guessing. Again, I get confused on what's actually going on because the types are all synthetic, and so the error messages are near useless... at least to me :expressionless:

Client: https://github.com/wspeirs/logstore/blob/03b7166c49a37c3b512b4ad98a7b2a3f9ceb5290/src/rpc_server.rs#L136

HTTP server: https://github.com/wspeirs/logstore/blob/03b7166c49a37c3b512b4ad98a7b2a3f9ceb5290/src/http_server.rs#L49

I'm struggle to handle the send/recv to the client.

                let result = clients.values().map(move |rpc_client| {
                    let (tx, rx) = rpc_client.get_connection();

                    // make a bogus request, would come from GET request
                    let req = RequestMessage::Get(
                        String::from("method"),
                        LogValue::String(String::from("GET")),
                    );
                    tx.send(req).and_then(|_| rx.into_future())
                });

For some reason the compiler thinks there should be a tuple. I have no idea where that would be coming from:

error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<futures::sync::mpsc::Receiver<rpc_codec::ResponseMessage>> as futures::IntoFuture>::Error == futures::sync::mpsc::SendError<rpc_codec::RequestMessage>`
  --> src/http_server.rs:58:34
   |
58 |                     tx.send(req).and_then(|_| rx.into_future())
   |                                  ^^^^^^^^ expected tuple, found struct `futures::sync::mpsc::SendError`
   |
   = note: expected type `((), futures::sync::mpsc::Receiver<rpc_codec::ResponseMessage>)`
              found type `futures::sync::mpsc::SendError<rpc_codec::RequestMessage>`

For some reason it also believes result to be a Map instead of a future, even though the last thing that's returned is rx.into_future() from tx.send(...).and_then(...):

error[E0277]: the trait bound `futures::stream::Map<futures::stream::MapErr<futures::stream::IterOk<std::iter::Map<std::collections::hash_map::Values<'_, u32, rpc_server::RPCClient>, [closure@src/http_server.rs:50:51: 59:18]>, std::io::Error>, [closure@src/http_server.rs:77:34: 77:57]>, [closure@src/http_server.rs:78:30: 82:26]>: futures::Future` is not satisfied
  --> src/http_server.rs:75:17
   |
75 | /                 Box::new(
76 | |                     stream::iter_ok(result)
77 | |                         .map_err(|e| hyper::Error::Io(e))
78 | |                         .map(|rpc_rsp| {
...  |
82 | |                         }),
83 | |                 )
   | |_________________^ the trait `futures::Future` is not implemented for `futures::stream::Map<futures::stream::MapErr<futures::stream::IterOk<std::iter::Map<std::collections::hash_map::Values<'_, u32, rpc_server::RPCClient>, [closure@src/http_server.rs:50:51: 59:18]>, std::io::Error>, [closure@src/http_server.rs:77:34: 77:57]>, [closure@src/http_server.rs:78:30: 82:26]>`
   |
   = note: required for the cast to the object type `futures::Future<Error=hyper::Error, Item=hyper::Response<std::boxed::Box<futures::Stream<Item=hyper::Chunk, Error=hyper::Error>>>>`

Conceptually I understand what a Future is, and what a Stream/Sink is... but I'm blind to what's actually going on because of all these stacked and synthesized types. Besides just continuing to bang my head on this, how can I learn to understand what's going on here? I feel like >50% of the time I'm just randomly trying combinations of things in the hopes that it will compile. The rest of my Rust code I have a pretty solid understanding of what's going on at each line, but this Tokio stuff just has me lost.

The tuple is coming from rx.into_future(). This returns a StreamFuture. This is a future that “peels off” the next item from the stream (when it’s available). Since the stream may have more items in the future, it also returns the stream back to you - that’s where the tuple is coming from. Note the Item and Error associated types for it: StreamFuture in futures::stream - Rust

Relatedly, you’ll probably need to add a map_err to tx.send(...) to deal with the SendError in your chain. Probably you’ll want to convert it to some internal error API that you’re using. But if you don’t convert it (or otherwise map it), the compiler will complain about error types mismatching.

As for the “drowning in types” feeling that you have, it’s perfectly normal to feel this way when working with futures and tokio for the first time. Rust compiler is known for it’s good (sometimes great) error messages - that won’t be the case with futures, at least today. All I can say is it’ll get easier, mostly because it’ll become less foreign and you’ll know what it’s saying by scanning the error. Once the futures API becomes more familiar to you, it’ll also get easier.

I think I fixed the tuple issues. However, now I'm facing another conceptual problem... how do I return the rx side of the response channel? I have my send/recv setup as follows:

        let connection_future = TcpClient::new(MessageProto)
            .connect(&socket_addr, &handle)
            .and_then(  move |client| {
                request_rx
                    .map_err( |e| unreachable!("rx can't fail"))
                    .and_then( move |msg: RequestMessage| {
                        println!("REQUEST: {:?}", msg);
                        let response_tx_inner = response_tx.clone();

                        client.call(msg).and_then( move |response: ResponseMessage| {
                            println!("RESPONSE: {:?}", response);
                            response_tx_inner
                                .send(response)
                                .map_err(|e| IOError::new(ErrorKind::InvalidData, e.to_string()))
                        })
                    })
                    .fold((), |_acc, _| Ok::<(), IOError>(()))
            });

... and my get_connection setup as:

    pub fn get_connection(&self) -> (Sender<RequestMessage>, Receiver<ResponseMessage>) {
        (self.tx.clone(), self.rx)
    }

But I get an error with not being able to borrow self.rx:

error[E0507]: cannot move out of borrowed content
   --> src/rpc_server.rs:179:27
    |
179 |         (self.tx.clone(), self.rx)
    |                           ^^^^ cannot move out of borrowed content

Do I need to wrap this receiving end in a Mutex or something?

I also get an issue about clients not having a static lifetime in https://github.com/wspeirs/logstore/blob/75fa4f56c2b1bc3384e605ccd6689432f8287f1d/src/http_server.rs#L51:

error[E0597]: `clients` does not live long enough
  --> src/http_server.rs:51:21
   |
51 |                     clients.values().map(move |rpc_client| {
   |                     ^^^^^^^ borrowed value does not live long enough
...
99 |     }
   |     - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

I REALLY appreciate all the help you've given me here! Hopefully I'll crack this soon, and then can start building out the RPC commands and then never look at this code again!!! :smiley:

You are not borrowing it here, you are trying to move it away. This means that if you called get_connection twice (which the API allows you to do since it only takes self by reference), the second attempt would illegally return a second receiver on an MPSC channel (which does not support this), or maybe a pointer into undefined memory if the new owner decided to free the receiver. The compiler helps you avoid this here.

@HadrienG, that makes sense... but how do I do what I actually want to do which is to provide to the HTTP server an RPC client where I can send/receive messages based upon HTTP requests? HTTP Request -> RPC Request -> RPC Resonse -> HTTP Response.

Do I need to send the messages "into" the RPC client instead, and return the responses?

The tricky part is to dispatch the messages that are received back to the corresponding HTTP request, and to correctly handle this kind of scenario:

  • HTTP Request A starts, is sent to RPC server
  • HTTP Request B starts, is sent to RPC server
  • RPC server sends a reply to one of the requests

One possible way to do this would be to spawn a single one-shot communication channel per message. Tokio and futures-rs have some primitives for that. In a nutshell, for each request that you want to send to the RPC server...

  • You create a one-shot channel (futures::sync::oneshot)
  • You queue the sending end in RPCClient
  • You send the RPC request to the server
  • You give back the receiving end to the HTTP request handler
  • When an RPC server reply is received, you pop the first sending channel from RPCClient and send the result to the corresponding HTTP request handler to wake it up

This solution works as long as RPC requests can never be reordered in the pipeline (neither by the connexion, nor by the server), which is the scenario I think you are operating under.

Ideally, the http_server module should not see the machinery needed for receiving and dispatching the RPC replies. All it should have is a method which allows it to send a request in and get a oneshot channel to listen to as a direct result (or even better, a future of RPC result).

OK, I'll have to look at one-shot channels, and change my logic around how
I send messages. I guess I am "exposing" too much to the HTTP server code.

As for the reordering part, I would want this to multiplex... Some RPC
messages (insert these 100 log messages) will take much longer than others
(fetch this one message by ID). I was thinking I'd eventually implement

to make this happen.

If you can have a reordering of RPC requests, then you will need to adapt your scheme for mapping RPC replies to the corresponding HTTP requests. Instead of a queue (which is correct for FIFO replies), you could for example have the RPC handler internally keep a HashMap which maps RPC reply keys to the one-shot channel that wakes the corresponding HTTP request.

Another thing is, so far I assumed that one HTTP request = one RPC request. If that stops being the case, you may want to switch from single-shot channels to multiple-use ones, and adapt the RPC handler's interface to enable channel reuse.

I don't think it'll ever be the case that there won't be a 1-to-1 mapping of HTTP to RPC. That said, there will be multiple RPC clients each getting the request.

I would have thought that the future context would keep everything separate for me. How would a response to an RPC request end up in the future of a different HTTP request?

Catching up a bit here ...

If your rpc servers are multiplexed then it’s possible for 2 http requests, H1 and H2, to have their responses from the rpc server come back in arbitrary order. Since you’re using futures, H1 and H2 can dispatch concurrently on your http server. The backend rpc server can process them at arbitrary pace, and you don’t know which one will complete first. If you have a shared TcpStream to the rpc server, when it receives a rpc response you wouldn’t know which http request it’s for. You’d need to do correlation. The simplest scheme is to send a uuid to the rpc server and have it echo it back - the http server can then use that to correlate rpc responses to http requests.

If you can have a reordering of RPC requests, then you will need to adapt your scheme for mapping RPC replies to the corresponding HTTP requests.

@HadrienG, my understanding is that when I switch to Multiplex this will all be taken care of at the RPC layer: tokio_proto::multiplex - Rust

If you have a shared TcpStream to the rpc server, when it receives a rpc response you wouldn’t know which http request it’s for. You’d need to do correlation. The simplest scheme is to send a uuid to the rpc server and have it echo it back - the http server can then use that to correlate rpc responses to http requests.

@vitalyd, as I mentioned above, I think Multiplex will take care of all of this for me.

I solved all the connection issues, I hope by doing the following:

type Connection = ClientService<TcpStream, MessageProto>;

pub struct RPCClient {
    address: String,
    conn: Connection
}

impl RPCClient {
    pub fn new(address: String, core: &mut Core) -> RPCClient {
        let socket_addr = address.parse().unwrap();

        // create a handle for the connection
        let handle = core.handle();

        let connection_future = TcpClient::new(MessageProto)
            .connect(&socket_addr, &handle);

        // establish this connection
        let conn = core.run(connection_future).unwrap();

        RPCClient {
            address,
            conn
        }
    }

    pub fn make_request(&self, req: RequestMessage) -> Box<Future<Item=ResponseMessage, Error=IOError>> {
        return Box::new(self.conn.call(req));
    }
}

This all seems to compile nicely with Hyper:

                let response_futures =
                    clients.values().map(move |rpc_client| {
                    // make a bogus request, would come from GET request
                    let req = RequestMessage::Get(
                        String::from("method"),
                        LogValue::String(String::from("GET")),
                    );

                    rpc_client.make_request(req)
                });

However, I'm facing the issue now where I'm getting back multiple RPC response, which each have multiple Log messages in them. What I want is a "flat" stream of Log messages, as strings. Here is my attempt:

                let response = stream::futures_unordered(response_futures)
                    .map_err(|e| Error::Io(e))
                    .map(|resp| {
                        match resp {
                            ResponseMessage::Ok => vec![Value::Null.to_string()],
                            ResponseMessage::Logs(l) => l.into_iter().map(|m| map2json(m).to_string()).collect::<Vec<_>>()
                        }
                    })
                    .flatten()

The problem is that flatten() cannot convert my Vecs into Streams, and so it complains:

error[E0277]: the trait bound `std::vec::Vec<std::string::String>: futures::Stream` is not satisfied
  --> src/http_server.rs:73:22
   |
73 |                     .flatten()
   |                      ^^^^^^^ the trait `futures::Stream` is not implemented for `std::vec::Vec<std::string::String>`

How can I convert the Vec<String>s into a Stream so I can call flatten and get one stream of strings that I can then stream back as the response? Thanks!

You probably want concat2 instead of flatten. This gives you a future of the end result (a single Vec with all items).

Tried that, the problem is that Hyper is looking for a Stream of Chunks to send back to the client. If I can convert into a Stream of Strings, then I can convert those into Chunks.

Ah, then you can try iter_ok - it forms a Stream over an iterable.

Am I missing an import or something?

                let response = stream::futures_unordered(response_futures)
                    .map_err(|e| Error::Io(e))
                    .map(|resp| {
                        match resp {
                            ResponseMessage::Ok => vec![],
                            ResponseMessage::Logs(l) => l.into_iter().map(|m| map2json(m).to_string()).collect::<Vec<_>>()
                        }
                    })
                    .concat2()
                ;

                let body: ResponseStream = Box::new(stream::iter_ok::<_, _>(response));

I get the following error:

error[E0277]: the trait bound `futures::stream::Concat2<futures::stream::Map<futures::stream::MapErr<futures::stream::FuturesUnordered<std::boxed::Box<futures::Future<Error=std::io::Error, Item=rpc_codec::ResponseMessage>>>, [closure@src/http_server.rs:66:30: 66:46]>, [closure@src/http_server.rs:67:26: 72:22]>>: std::iter::Iterator` is not satisfied
  --> src/http_server.rs:82:53
   |
82 |                 let body: ResponseStream = Box::new(stream::iter_ok::<_, _>(response));
   |                                                     ^^^^^^^^^^^^^^^^^^^^^^^ `futures::stream::Concat2<futures::stream::Map<futures::stream::MapErr<futures::stream::FuturesUnordered<std::boxed::Box<futures::Future<Error=std::io::Error, Item=rpc_codec::ResponseMessage>>>, [closure@src/http_server.rs:66:30: 66:46]>, [closure@src/http_server.rs:67:26: 72:22]>>` is not an iterator; maybe try calling `.iter()` or a similar method
   |
   = help: the trait `std::iter::Iterator` is not implemented for `futures::stream::Concat2<futures::stream::Map<futures::stream::MapErr<futures::stream::FuturesUnordered<std::boxed::Box<futures::Future<Error=std::io::Error, Item=rpc_codec::ResponseMessage>>>, [closure@src/http_server.rs:66:30: 66:46]>, [closure@src/http_server.rs:67:26: 72:22]>>`
   = note: required because of the requirements on the impl of `std::iter::IntoIterator` for `futures::stream::Concat2<futures::stream::Map<futures::stream::MapErr<futures::stream::FuturesUnordered<std::boxed::Box<futures::Future<Error=std::io::Error, Item=rpc_codec::ResponseMessage>>>, [closure@src/http_server.rs:66:30: 66:46]>, [closure@src/http_server.rs:67:26: 72:22]>>`
   = note: required by `futures::stream::iter_ok`

Sorry, I was on mobile and a bit terse.

I whipped up a little toy example (that sort of mimics your code) for you to play with: (Playground).

But basically, you iter_ok inside your map closure, which returns a Stream of strings; you then flatten that to get a single stream, which you can then (presumably) feed to hyper.

I got it all to compile, but it doesn't work. It seems like there is some sort of race condition where the futures aren't really waiting for the response:

                let response = stream::futures_unordered(response_futures)
                    .map_err(|e| Error::Io(e))
                    .map(|resp| {
                        debug!("RSP: {:?}", resp);

                        match resp {
                            ResponseMessage::Ok => stream::iter_ok(vec![]),
                            ResponseMessage::Logs(l) => stream::iter_ok(l.into_iter().map(|m| Chunk::from(map2json(m).to_string())).collect::<Vec<_>>())
                        }
                    })
                    .flatten();

                let body: ResponseStream = Box::new(response);

                Box::new(futures::future::ok( Response::new()
                    .with_status(StatusCode::Ok)
                    .with_body(body)
                ))

The request is made to the RPC server, and it reads the records off disk, but I never get a response, and curl just hangs waiting for the body of the response. However, if I change-up the request such that the RPC server responds with zero messages, then it all works and returns an empty body and closes the connection. Thoughts?

Do you see all the requests arriving on the rpc server? Do you see any of the rpc futures resolve (you have a debug log there)? Are you sure the rpc server is actually sending data back?

The RPC server gets the request, and processes it by reading the records from disk. However, I never see it get back to the HTTP/RPC client side as I never see my RSP: {:?} message printed. I do see this message though when I ask for a record that does not exist on disk. That's why it seems like a race condition.

Are you sure your http server’s framing code is correct? That is, is it reading and parsing incoming bytes (from rpc server) into messages correctly? If you, for example, mistakenly indicate that you don’t have enough bytes for a frame, it’ll just hang.