How to use Tokio TcpStream in a loop?

Is there a way to create a TcpStream connection and then indefinitely listen in a loop for incoming messages, and then responding when they arrive?

I have seen several documentation examples on how to do it with a TcpListener, for ex,

// https://tokio.rs/blog/2018-03-tokio-runtime/
let server = listener.incoming()
    .map_err(|e| println!("error = {:?}", e))
    .for_each(|socket| {
        tokio::spawn(process(socket))
    });

tokio::run(server);

However, in my specific use case, I cannot listen on a port, I need to form a TcpStream connection to a port, send an initiation message, and then afterwards, I need to receive/respond to messages.

Thank you for the help

You’d use TcpStream::connect() to establish a connection to a remote. That gives you a Future that resolves to a TcpStream once the connection is established.

Once you have the TcpStream, a common approach is to split() it into read and write halves. The read portion is a Stream and the write is a Sink. You can then connect the two by mapping (ie transforming) inbound data to outbound data. Take a look also at framing, where you can layer a decoder/encoder to turn raw bytes into Rust types and vice versa.

There should be examples in the tokio GitHub demonstrating the above.

1 Like

Thank you for your response.

I am already attempting to use frames + codecs with the read/write, the problem I'm having is to have a loop in which I can receive nonblocking requests and then respond to them when my response futures are ready.

I could not find adequate examples anywhere, including in the github repo (not to say they do not exist, but I did not find them).

I will try to use the split method, (which I did not try), and see if that perhaps helps my problem. I will report my findings.

Looking at split now, I do not see how that will help my problem.

Take a look at code starting https://github.com/jgallagher/tokio-chat-example/blob/master/tokio-chat-client/src/main.rs#L162. Ignore the GUI portion there. Basically, you'll probably want a setup like:

// `tx` is to send frames to server, `rx` is to receive frames from server
let (tx, rx) = sock.framed(...).split();
let client = rx.map(|msg| {
    // process message from server and generate a reply;
}).forward(tx);
tokio::run(client);

So your rx would be the way to receive messages, process them, and generate a response. Those responses are then sent to the server via forward(tx).

Although there are ways to create loops using tokio/futures, that's generally not how you'd set up a processing pipeline. Instead, you connect the various pieces (e.g. the incoming Stream of server-sent messages to the Sink sending responses to the server) and then run them on the reactor. Put another way, you set up "instructions" on how to process the data flow using the various combinators on Stream/Sink, and then submit them to the reactor.

2 Likes

Thank you for the detailed example, I will definitely give this a try.

In the above example, the process message from server portion of the code only works if the thing I am doing is synchronous, for example, if random_process_message_func returned u32. If I want the random_process_message_func to return Future<u32, _>, what would I need to do to get it to compile?

let (tx, rx) = sock.framed(...).split();
let client = rx.map(|msg| {
    random_process_message_func(msg)
}).forward(tx);
tokio::run(client);
1 Like

You’d replace rx.map(...) with rx.and_then(...) (or then()).

2 Likes

Thanks again!

Perhaps my understanding is incorrect, but I noticed in the documentation for tokio they recommend using tokio::spawn, so that the executed tasks are done on a executor (threadpool) instead of on the current thread.

The below code that I have works, but should I be using tokio::spawn somewhere for greater efficiency/correctness? If so, I cannot figure out how/where to use it.

Please let me know.

        let codec = MyCodec::new();
        let connection = self.connection; // is tokio tcpstream
        let mut processor = self.request_processor; 

        let initial = connection.framed(codec);

        let (tx, rx) = initial.split();

        let client = rx.and_then(move |msg: MyMessage| {
            process_message(msg, &mut processor)
                .map_err(|e| e.into())
        })
        .forward(tx)
        .map(|_| ())
        .map_err(|_| ());

        tokio::run(client);

spawn is useful to submit a given future state machine to the executor so that it can proceed independently of any other future scheduled in the runtime. This would be considered a “background” state machine. The canonical example is a TCP server that accepts connections. The main accept loop is the root future that’s run(). Each accepted connection is independent of all others, and they get spawn()d to run independently.

In your case you have a single tcp connection (you’re writing a client right?) that is its own state machine - I don’t think there’s anything to spawn here.

1 Like

Thank you for the explanation.

The above example that I have works perfectly for processing messages in a loop.

However, how can I perform a handshake before beginning the loop?

For example, I would like to do the following:

  1. Read initial message.
  2. Send ack response.
  3. Begin loop as implemented above.

I have been trying to get something to compile with no luck, and since @vitalyd has been so helpful, I figured I'd ask again.

You will want something like:

let initial = connection.framed(codec);
// into_future() allows us to peel off
// a single received message, and then get the 
// `initial` back
let handshake = initial.into_future()
   .map_err(|(e, _)| e) // keep the error, drop the conn
   .and_then(|(msg, io)| {
   // msg is Option<YourMsgType>.  It’s an Option
   // because the peer may have terminated the
   // connection without sending anything.
   // `io` is the framed connection given back to you
   // handle message somehow
   // send request
   io.send(...)
});

// handshake is now a future chain that completes 
// when the request is sent, and returns you back
// the framed.  We’ll tack on our loop as a continuation
let client = handshake.and_then(|conn| {
    // `conn` is the same value as `initial` above
     let (tx, rx) = conn.split();

      rx.and_then(move |msg: MyMessage| {
            process_message(msg, &mut processor)
                .map_err(|e| e.into())
        })
        .forward(tx)
        .map(|_| ())
        .map_err(|_| ())
};
tokio::run(client)

Thank you very much! You are like the Santa Claus of Futures/Tokio explanations.

2 Likes

No problem - happy to help.

:laughing:

I am not sure that this actually works in a non blocking async IO way. I am doing a simple test with two messages. The first message has been given an artificial IO delay, whereas the second message is not. The expected result is that the second message would be returned first, followed by the first message.

I have been trying to use sleep for a 1000ms on the first message during my process_message method, attempting both with tokio_timer (supposedly), as well as simply using oneshot channels and spawning the sender on a different thread with a synchronous sleep.

Nevertheless, it seems that process_message never actually gets called for the second message until the first future is returned. This is not the behavior I was expecting.

Is there a way to allow this to actually respond to messages out of order, depending on when the future for process_message completes?

The client future chain we had above is one big "task" that tokio will run. If you block/delay any part of that pipeline, it cannot process the next available message read from the socket. If you want requests/responses to run independently, you'll need to spawn() them as individual futures (and thus tasks) that run concurrently and not part of one larger chain.

Thank you for the quick response as always. That makes sense, I will experiment with doing that instead.

Where exactly would I need to put spawn in the above example? We have process_message which returns a future, but if were to call spawn on it, how would I make sure that the result was forwarded into the tx?

I have been running into a lot of issues with something similar lately, and would appreciate the help.