Sized is not implemented for Future


#1

I’m trying to convert a list of structs (RPCClient) and convert them into Futures using map: self.0.iter().map(move |rpc_client: &RPCClient| { ...});

However, I get the following error message:

the trait bound `futures::Future<Item=rpc_codec::ResponseMessage, Error=std::io::Error>: std::marker::Sized` is not satisfied

help: the trait `std::marker::Sized` is not implemented for `futures::Future<Item=rpc_codec::ResponseMessage, Error=std::io::Error>`

After searching, I found a few indications that I should simply add use futures::Future; to my module, and it will work:

I’ve done this, and I’m still getting the error. From what I can tell, Future does not explicitly remove the bound, and neither do std::io::Error or rpc_codec::ResponseMessage, so I’m not sure why the compiler would think that Future does not bind Sized.

I also tried boxing the result of my map() call, and I get a similar error:

the method `new` exists but the following trait bounds were not satisfied: 
`futures::Future<Item=rpc_codec::ResponseMessage, Error=std::io::Error> : std::marker::Sized`

Is this a compiler bug? Any thoughts on how to make this work?


#2

Future is a trait which means it itself is not sized - it’s not something you return from a function (you return something that implements it).

Can you show more of your code, including the function signature?


#3

OK, then I’m fundamentally misunderstanding how to do this. What I have is a Hyper HTTP server that will receive requests, and then dispatch said requests to a vector of RPC clients. An RPC client is a Tokio TCPClient. Given a mpsc channel that I include in the client, I know how to produce a result (https://github.com/wspeirs/logstore/blob/devel/src/main.rs#L89):

        let res =
            rx.map_err(|e| unreachable!("rx can't fail"))
                .and_then(move |rpc_req| {
                    client.call(rpc_req).and_then(|response| {
                        debug!("RPC RSP: {:?}", response);
                        Ok(response)
                    })
                })
                .fold(ResponseMessage::Ok, |_acc, rsp| Ok::<ResponseMessage, IOError>(rsp));

And so I was trying to link that together such that I could send a message via the channel, and then get the result from this future (https://github.com/wspeirs/logstore/blob/devel/src/http_server.rs#L46):

                let all_sends = self.0.iter().map(move |rpc_client: &RPCClient| {
                    let client = rpc_client.clone();

                    let req = RequestMessage::Get(String::from("method"), LogValue::String(String::from("GET")));
                    let send = client.tx.send(req);

                    *client.res
                });

I’m wondering if I need 2 channels, one to send the RequestMessage on and one to receive the ResponseMessage back from? However, I don’t know where I’d run the future that performs the actual request.

All the code can be found here: https://github.com/wspeirs/logstore/tree/devel/src Thanks!


#4

At a high level, here’s how I imagine your code working:

  1. Your hyper http server receives a request.
  2. This request is forwarded to all of the RpcClient instances.
  3. Each RpcClient::call returns some implementation of Future.
  4. The hyper request handler uses future::join_all to wrap all those futures into one future that resolves when all of the underlying futures (ie the rpc calls) resolve.
  5. The hyper request handler returns that JoinAll future, which will then be driven by the tokio event loop.

You can also use future::futures_unordered if you want to turn all the individual rpc futures into a Stream, and then apply something like for_each on the stream if you want to take additional action when the rpc calls complete.

It looks like you’re trying to deref a Rc<Future<...>> stored inside the RpcClient, and return that. That’s not the way you’d normally write futures-based code.

I don’t think you need a channel to send a request to the rpc clients - you can just call the call method on the client and pass the request (possibly a clone of it for each client).

The RpcClient would use whatever tokio constructs (eg TcpStream or maybe some Framed instance on top of a TcpStream) to issue the rpc call to the remote end.

This might be a bit too high level so let me know and I’ll try to lower it to your code a bit better (I only glanced at it).


#5

I re-worked things, and I think simplified it a lot. I got rid of the channels and simply have a Vec<Connect> which I pass to the Hyper HTTP server. I then handle all of the RPC sending/receiving in the HTTP server. The problem I have now is that I’m unable to get an iterator over the Vec<Connect> (https://github.com/wspeirs/logstore/blob/ec3bdd02ffc6a896bbcd9496cdfe34f77dcea990/src/http_server.rs#L49):

                let clients = self.0.clone();
                let all_sends =
                    clients.into_iter().map(move |rpc_client :Connect<Pipeline, MessageProto>| {
                    let req = RequestMessage::Get(String::from("method"), LogValue::String(String::from("GET")));
                    let client = rpc_client.wait().unwrap();

                    client.call(req).and_then(|response| {
                            debug!("RPC RSP: {:?}", response);
                            Ok(response)
                        })
                });

Results in:

error[E0507]: cannot move out of borrowed content
  --> src/http_server.rs:49:33
   |
49 |                 let all_sends = clients.into_iter().map(move |rpc_client :Connect<Pipeline, MessageProto>| {
   |                                 ^^^^^^^ cannot move out of borrowed content

How do I get around this? I would have thought the clone() on my Rc would have done what I wanted/needed here.


#6

Rc only gives you an immutable reference to the value it’s holding. Vec::into_iter() takes self as a parameter - that means it wants to consume self. To do that, you’d need to own the Vec but you only have a borrow of it.

Do you need to have the client by value or can a shared reference be used?

Also, I noticed you have a rpc_client.wait() call in there. If this is what I think it is (ie a blocking call) you will likely lock up the reactor because you’ll block the thread that’s servicing the reactor (event loop). Why do you need that? Typically you “wait” for a future by chaining on a continuation that runs when it resolves, and let the reactor handle the scheduling.


#7

Thank you again for the help! What I’d really like to do is connect to all of my endpoints before I do anything. However, I’m not seeing a way to do this as the core needs to be running. I’m trying to shoe-horn this into the HTTP service by calling wait(). So really what I want is this:

  • Attempt to establish a connection for each of my addresses: https://goo.gl/ueHkYr
  • If any of them fail, then simply bail-out and/or remove them from the list of clients
  • Pass this Vec of already connected clients to my HTTP service
  • During the call() of the HTTP service, make all my calls to the RPC clients and process the responses

I’m pretty sure, given the right combinations of references and mutables, I can do this last part in an asynchronous way. What I don’t know how to do is attempt to connect all the endpoints, and then place all those connections into a vector that my HTTP service can use.

Do you need to have the client by value or can a shared reference be used?

I think a shared reference would be fine as ClientService's call method is: fn call(&self, req: P::Request).


#8

So you essentially want some setup code to run, which connects all your endpoints, before you start servicing http requests, right? If so, the easiest way to do that would be to Core::run a future (or chain of) that connects all your endpoints. Core::run returns you the value that the future resolves to. In your case, that’ll be a Vec of your endpoints.

Once you have that, you then start your http server and run your empty future. In other words, you invoke Core::run twice, once to get the setup done and then another time for the rest of your server.

Yes, that looks perfect for a shared reference.


#9

That make sense. Getting hung-up on trying to unwrap the Result from run():

    let connections = ["127.0.0.1:12345", "127.0.0.1:23456"].iter().map(|addr| {
        let addr = "127.0.0.1:12345".parse().unwrap();
        let handle = core.handle();

        info!("Attempting to connect to {}", addr);

        TcpClient::new(MessageProto).connect(&addr, &handle).and_then(|client| Ok(client))
    }).collect::<Vec<_>>();

    let client_stream: IterOk<_, IOError> = stream::iter_ok(connections.iter());

    let clients = core.run(client_stream.into_future()).unwrap();

Gives me the following error:

error[E0599]: no method named `unwrap` found for type `std::result::Result<(std::option::Option<&futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, futures::stream::IterOk<std::slice::Iter<'_, futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, std::io::Error>), (std::io::Error, futures::stream::IterOk<std::slice::Iter<'_, futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, std::io::Error>)>` in the current scope
   --> src/main.rs:105:57
    |
105 |     let clients = core.run(client_stream.into_future()).unwrap();
    |                                                         ^^^^^^
    |
    = note: the method `unwrap` exists but the following trait bounds were not satisfied:
            `(std::io::Error, futures::stream::IterOk<std::slice::Iter<'_, futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, std::io::Error>) : std::fmt::Debug`

For some reason it needs std::fmt::Debug to be implemented… why’s that?


#10

That’s how Result::unwrap is defined (you’ll see it’s in an impl block for Result where E: Debug). That’s because the error message if the result is an error is the error’s Debug representation.

Use run(...).ok().unwrap() instead (or a match statement). Alternatively, try to figure out which part of your Error type doesn’t implement Debug (and thus makes the whole thing not Debug).


#11

@vitalyd, I thought about this problem more, and I’m going to need to support clients that come and go, because the corresponding server dies. As an attempt to at this, I went with the following:

pub enum RPCConnection {
    Disconnected,
    Connected(???)
}

pub struct RPCClient {
    address: String,
    handle: Handle,
    connection: RPCConnection
}

impl RPCClient {
    pub fn new(address: String, handle: Handle) -> RPCClient {
        RPCClient{ address, handle, connection: RPCConnection::Disconnected }
    }

    pub fn get_connection(&mut self) -> RefCell<Connect<Pipeline, MessageProto>> {
        match self.connection {
            RPCConnection::Disconnected => {
                let socket_addr = self.address.parse().unwrap();
                let client =
                    TcpClient::new(MessageProto).connect(&socket_addr, &self.handle).and_then(|c| c);
                self.connection = RPCConnection::Connected(client);

                client
            },
            RPCConnection::Connected(c) => c
        }
    }
}

I have 2 problem:

  1. I haven’t a clue what type to use for Connect() in RPCConnection. How do I go about finding out the type that results from TcpClient::new(MessageProto).connect(&socket_addr, &self.handle).and_then(|c| c)? What strategies do you use to figure out the type of something as complex as this?
  2. I know I’m going to run into all kinds of move/borrow issues with my get_connection function. I think you can see what I’m trying to accomplish: if the client is disconnected, then make the connection and stash that away in RPCClient's RPCConnection, if not, then just grab it from RPCConnection and return it. This fundamentally feels like the wrong way to go in Rust because RPCConnection is going to own the client, and so there isn’t much I can do after that.

What am I missing here? How would you go about setting up something like this? Thanks!


#12

Right, this is a tricky part. I’m on mobile right now so may leave you hanging with follow-up questions, so go ahead and ask away and I’ll get back to them.

Let me try to first help you with the high level question of how to “name” complex types (that you’ll be surrounded by when using futures and tokio).

When you start calling the various combinators on futures (eg and_then, then, map, map_err, etc) you start creating a “type cake”. Each of these combinators is represented by a distinct type/struct (eg AndThen, Then, Map, MapErr for the examples in the previous sentence). Each of these types is a generic type and will be taking a closure from you that performs whatever work is needed. Once you add a closure to these types (which basically right away) these types are no longer nameable: the compiler synthesizes a struct to represent the closure but this is a synthetic type - you cannot name it in code. This is one of the important things to understand.

The second important thing to understand about these combinator structs is some of them will usually implement the Future (or the Stream) trait, so long as certain generic type bounds are met. Some tokio types (eg TcpStream) may implement additional traits, such as AsyncWrite and/or AsyncRead. A lot of them will also implement the Stream and/or Sink trait. You should familiarize yourself with all of these traits if you intend to do serious work with futures and tokio.

This brings us to the following, which hopefully helps with your first question. If you need to store one of these type monsters in your own struct, you have (on stable Rust today) basically two choices:

  1. Make your struct/enum generic as well. For example:
pub enum RPCConnection<A: AsyncWrite + AsyncRead> {
    Disconnected,
    Connected(A),
}

This allows you to put any type, nameable or not, into the Connected variant so long as it impls those two traits. You should pick whichever trait(s) bounds make sense; eg you may want A: Stream<Item=MyRequests, Error=...> + Sink<SinkItem=...>. Or maybe you want F: Future<Item=...>. You see where I’m going with this.

There are two important bits to know about this approach. First, you’ll likely get good performance because the compiler will monomorphize these generic types and it’ll have full understanding of the types involved = no dynamic dispatch. Second, the generic type parameter becomes contagious - other types that want to use RPCConnection now also need to deal with the type parameter somehow, possibly becoming generic themselves. Turtles all the way down and all that …

  1. Erase the types by putting them into a Box. Sticking with the previous example:
pub trait AsyncReadWrite: AsyncRead + AsyncWrite {}

pub enum RPCConnection {
   Disconnected,
   Connected(Box<AsyncReadWrite>),
}

You’ll note a few things. First, RPCConnection has no generic type parameter - we’ve obviates the need for that because we’re holding a trait object now, with type information erased. There’s also the restriction that trait objects cannot use multiple trait bounds (apart from the OIBIT ones like Send, Sync), so we can’t say Box<AsyncRead + AsyncWrite>. Instead, we need to define a unification trait that combines these two. This wouldn’t be needed if you wanted to store just a boxed future or stream though.

So this approach stops the generic type proliferation but besides the multiple trait bounds restriction, you now also get allocations (for the Box) and indirection (ie virtual dispatch).

These two approaches need to be weighed against each other for their pros/cons as they apply to your specific circumstances. Feel free to experiment and see what feels best.

As for question 2, yeah, I see where you’re trying to go. Here I would probably recommend that you explore the world of implementing your own futures. The case you have seems very similar to tokio’s TcpStream and TcpStreamNew. The latter is basically a custom struct that implements Future and resolves to the TcpStream once the async connection has completed (or failed). You have a similar case: an RPCClient (ie TcpStream) and you can have a RPCClientNew which represents a pending connect of the RPCClient.

You may want to have a get_connection() that’s not a method on RPCClient but rather an associated function or a free function inside your module. Again, somewhat similar to TcpStream::connect (this is an associated function).

Ok, let me stop here as my fingers are getting a bit tired. Hopefully that frames things a bit better and puts you on the right path. As mentioned, fire away questions and I’ll try to answer later today or tomorrow.


#13

I got rid of my RPCConnection and went with TcpStreamNew inside of my RPCClient instead, as it basically has all the state info I’d want. Kinda a bummer there isn’t the equivalent for TcpClient, as now I have to frame it up with my codec, but not a huge deal. So now I have:

pub struct RPCClient {
    address: String,
    connection: Rc<TcpStreamNew>
}

impl RPCClient {
    pub fn new(address: String, handle: Handle) -> RPCClient {
        let socket_addr = address.parse().unwrap();

        RPCClient{ address, connection: Rc::new(TcpStream::connect(&socket_addr, &handle))
        }
    }

    pub fn get_connection(&self) -> Rc<TcpStreamNew> {
        self.connection.clone()
    }
}

However, I’m running into a borrow issue, and a Display issue when I try to use it in my HTTP server code:

    fn call(&self, req: Request) -> Self::Future {
        let clients = self.0.clone();

        match(req.method(), req.path()) {
            (&Method::Get, "/") => {

                let results =
                    clients.values().into_iter().map(move |rpc_client| {
                    let client = rpc_client.get_connection();
                    let req = RequestMessage::Get(String::from("method"), LogValue::String(String::from("GET")));

                    client.and_then(|c| {
                        c.framed(ClientCodec::new()).send(req).and_then(|response| {
//                            debug!("RPC RSP: {}", response);
                            Ok(response)
                        })
                    })
                });

// snip

If I uncomment the debug! line I get an error because Framed<TcpStream, LengthPrefixedMessage<ResponseMessage, RequestMessage>> does not implement std::fmt::Display. My understanding is that because I don’t define Framed in my module, I cannot implement Display… is that right?

The borrow issue is with client:

error[E0507]: cannot move out of borrowed content
  --> src/http_server.rs:56:21
   |
56 |                     client.and_then(|c| {
   |                     ^^^^^^ cannot move out of borrowed content

My hope was that wrapping TcpStreamNew in an Rc I could avoid this… do I need to use a Mutex or something?

Thanks!


#14

You should use a std::fmt::Debug based output, which is “{:?}”. Framed and friends usually impl Debug if their underlying generic parts do as well.

I think you’re just missing move: client.and_then(move |c| ...)


#15

Nope, same error:

error[E0507]: cannot move out of borrowed content
  --> src/http_server.rs:56:21
   |
56 |                     client.and_then(move |c| {
   |                     ^^^^^^ cannot move out of borrowed content

#16

Oh sorry, I didn’t notice the Rc there and didn’t look at the error carefully.

Yeah, this isn’t going to work. and_then takes self but you don’t own it - you own the Rc clone but not the inner TcpStreamNew. Rc will only give you a &TcpStreamNew.

You’ll need to redesign this so that ownership is more targeted. One way to use these combinators that take self is to have an owned value to call it on and then you get the value back in the closure. You can then thread it through the future chain and get it back at the end.

I’m a bit short on time at the moment but I’ll see if I can help some more later. Wanted to explain the error in the meantime.


#17

I think putting my RPCClient inside a HashMap will prevent me from ever making this work. HashMap is always going to return me &RPCClient (unless I’m missing something), and since that’s only ever borrowed from HashMap, I’ll never have a chance to own the internal connection, and therefore never be able to make this work.

Just not seeing how to make this all work in Rust :frowning:


#18

Let’s rethink the approach :slight_smile:

Instead of storing rpc clients in some container, spawn them onto the reactor and communicate with them via channels (eg unbounded ones in here).

The idea would be to have the sending half of the channel in the rpc client, and the hyper request handler would send a message over this channel, requesting an rpc call. The receiving half of this channel would be wired up to forward the requests to the underlying socket of the rpc client.

In addition, you’d have another channel for receiving rpc responses. The sending half would be in the rpc client, and the socket read portion would put messages on it. The receiving end would be available to the hyper code as a Stream of responses. The hyper code can return a future from call that completes when it’s able to pull a message off the channel.

https://github.com/jgallagher/tokio-chat-example might be useful to you, particularly the server code there.


#19

I’ve been looking at https://github.com/jgallagher/tokio-chat-example. The problem is, how do I deal with a reconnect? When I go to write to a TcpStream and it fails, how do I try to reconnect it?


#20

Assuming it actually fails because the remote has disconnected or gone away, you’d create a new rpc client with a fresh TcpStream and spawn that on the reactor with a new channel setup; essentially do the same thing as initial bootstrap. The errored one should be discarded.