How to create reuseable client with Tokio TcpStream?

I am trying to create an async client with an underlying tokio/tcpstream implementation. However, I am unsure how to actually make the client reusable, so that the user can instantiate it once, and then use it for several requests throughout the lifetime of the application.

The below code does not compile due to the send_message function, since sending a message with framed consumes the item itself.

What is the proper approach to make a client in a reusable way?

Thanks for the help.

type FunFramed = Framed<TcpStream, FunMessageCodec>;

pub struct FunClient {
    framed: FunFramed 
}

impl FunClient {
    pub fn connect() -> impl Future<Item=FunClient, Error=IoError> {
        TcpStream::connect(&SocketAddr::new("127.0.0.1".parse().unwrap(), CLIENT_PORT))
            .map_err(|e| e.into())
            .map(move |stream| {
                let codec = FunMessageCodec::new();
                let framed= stream.framed(codec);

                FunClient { framed }
            })
    }

    pub fn send_message(&self, message: Message) -> impl Future<Item=FunFramed, Error=IoError> {
        println!("MESSAGE: {:#?}", message);
        self.framed.send(message)
    }
}

Additionally, if I were to send multiple messages (expecting responses for each message) before one or more of them resolve, how can I tell which response corresponds to which request?

Unfortunately, I am still stuck on this problem. Perhaps to help I can show what I would like to do in JS to clarify:

'use strict';
// express in a js http server
const express = require('express');
const app = express();


// imaginary Db Driver that in Rust would be implemented with tokio tcpstream 
class DbDriver {
    // return a Promise (Future like abstraction)
    execute(query) {
        // image that I am async reaching out to underlying
        // db connection, and returning the following based on the query
        return Promise.resolve([1, 2, 3, 4]);
    }
}


// instantiate db client to be shared throughout multiple requests
// this is where the "reusable" comes into play
const dbDriverClient = new DbDriver();

app.get(
    '/',
    (req, res) => {
        // execute a request with the Db Client, and based on async result, send response back to client
        dbDriverClient.execute(req.query)
            .then((result) => res.send(result));
    }
)

app.listen(3000, () => console.log('Example app listening on port 3000!'))

In Rust, what I would like to do is to have a client with a execute function that can return a Futures<Item=Response> as its return type, much like how the JS code returns a Promise.

However, if I look at something like futures::sink::Sink - Rust, it does not return a Response like future, it simply returns Send<Self>.

I would appreciate any advice/help I can get, or any clear examples on how to make a client that acts in this way.

Thank you.

Writing a client of the variety you're interested in is ... pretty difficult, which is likely the reason for lack of replies to this thread.

You may want to look at hyper to see how it does it. This is the get() entrypoint. There's a lot of stuff happening in its client, but perhaps you can get the big picture from looking at it.

This will be one design decision you'll need to make. If a caller submits a request, how do you want to handle subsequent ones? You'll likely want to buffer, but then you'll need to decide how much buffering you want. If you wanted no buffering, then you could have send_message take self (i.e. consume the Client), and return it back in the future that also contains the response. This is a somewhat natural way to ensure there's only ever a single outstanding request per client.

If you want to have multiple requests in-flight concurrently, then you'll likely want to associate some correlation id with each request which the server also replies with. When a complete response has been decoded, you'd find the corresponding request and complete it with the response.

As you probably suspect by now, your FunClient won't have a Framed or any I/O object directly inside it. Instead, it'll be a shim over some channel or other mechanism by which you submit I/O to the executor in use.

But take a look at hyper's code and see if things become a bit clearer. There may be other client libs out there with a similar API to what you want, but nothing else springs to mind that I could point you to. Maybe others know something ...

Oh, and maybe @seanmonstar can offer some high-level advice based on his "in the trenches" experience ...

Thank you for response. I'll look at hyper to see if I can gain some clarity.

Even after looking at other code samples, I am still struggling to come up with a working solution. For what it's worth, I'm generally comfortable with all other aspects of Rust, but the futures/tokio portion is particularly difficult for me.

I think conceptually what I need to do is the following:

  1. Get a framed stream, and then split it into reader and writer parts.

  2. Create a way to actually funnel requests into the framed writer. I would create a channel, where the receiver's items would be sent by the framed writer, and the sender would be stored on my client struct. That way, when I want to dispatch a request, I would send an item through the sender portion of the channel.

  3. Create a second channel, so that the framed reader can funnel items into this channel's sender. This channel's receiver would be stored on the struct, that way somehow it can be used to return a Future to my theoretical client's send request.

Assuming that the above is actually the right thing to do (and it may be wrong), I am having trouble actually implementing this. Even the following partial test fails to compile, and I am not sure why:

fn main() {
    let codec = tokio_codec::BytesCodec::new();

    let client = TcpStream::connect(&"127.0.0.1:12345".parse().unwrap())
        .wait()
        .unwrap();

    let framed = client.framed(codec);

    let (tx, rx) = mpsc::unbounded();

    let action = rx
        .forward(framed);
        

    let bytes_to_send = vec![3, 5, 6].into();

    tx.unbounded_send(bytes_to_send).unwrap();

    tokio::run(action);
}

with errors like

25 |         .forward(framed);
   |          ^^^^^^^ expected struct `std::io::Error`, found ()
   |
   = note: expected type `std::io::Error`
              found type `()`

Please let me know if my assumptions are right, and what is causing the compilation error I am seeing.

The compilation error is because tokio::run() wants a Future<Item = (), Error = ()> - the important bit is the Error type. rx.forward(framed) generates a future that returns Error = std::io::Error, and the mismatch is the error.

The typical way to handle this is to map_err(|e| ...) the error to (), probably logging or doing some other error notification inside there.

Thank you for your quick response. In the above case, I have tried different permutations of trying to map into a tokio reactor friendly () () way, but I still get 2 compilation errors.

    let action = rx
        .forward(framed)
        .map(|_|())
        .map_err(|_|());
error[E0271]: type mismatch resolving `<tokio_io::framed::Framed<tokio::net::TcpStream, tokio_codec::BytesCodec> as futures::Sink>::SinkError == ()`
  --> src/main.rs:24:10
   |
24 |         .forward(framed)
   |          ^^^^^^^ expected struct `std::io::Error`, found ()
   |
   = note: expected type `std::io::Error`
              found type `()`

error[E0599]: no method named `map` found for type `futures::stream::Forward<futures::sync::mpsc::UnboundedReceiver<bytes::bytes::Bytes>, tokio_io::framed::Framed<tokio::net::TcpStream, tokio_codec::BytesCodec>>` in the current scope
  --> src/main.rs:25:10
   |
25 |         .map(|_|())
   |          ^^^
   |

Oh, there's a related concept on Sink - Sink has two associated types, SinkItem and SinkError, which are analogs to Future::Item and Future::Error. In that spirit, there's an analogous sink_map_err() combination. So something like this:

// `rx` is a `Stream<..., Error=()>` - forward() is only available 
// if () can be created from `framed`'s Error, which is std::io::Error.  That's obviously not the case.
// So we map `framed`'s SinkError to be (), and then they match up
let action = rx
        .forward(framed.sink_map_err(|_| ())); // add some logging or whatever in the closure
...
// `action` is a `Future` that returns a tuple of the stream and the sink that were "fused" together,
// but `tokio::run()` wants a future that returns () - so we map the result here
tokio::run(action.map(|_| ()));

The important thing to do when working with futures, streams, sinks, etc is to look at the generic trait bounds on the various implementations. For example, some struct Foo<X, Y, Z> implements, say, Stream only when X, Y, and/or Z themselves implement certain traits. Foo is otherwise constructable with arbitrary X, Y, and Z, but it just won't implement the important traits. The message you get in those cases can be startling, as it'll claim that some method foo (e.g. a combinator on Future) is not available.

When you see an error like that, go back to the types involved and see if their type parameters align with what the trait bounds expect.

1 Like

Thank you, I will look at the sink_map_err function, and I agree with your advice in general about the type documentation.

Thanks to @jonhoo's video tutorials with async tokio in zookeeper here, I was able to actually finally get a working solution. His code can be found here.

1 Like

Hey, I'm about to try to do the exact thing that you were doing. Before I watch that several hour long video, I had a few questions. Did you stick with the idea that you described with the channels? And do you have any tips before I do this as well?

I did end up using the solution, it seems to have worked for me, so I believe it could work for you too.

As for the video, I think it is valuable, since it helped me understand more about futures and I would recommend watching it, even though it is very long. (I believe I watched it on 1.25 or 1.5 speed, which might help).