How to use Tokio TcpStream in a loop?


#1

Is there a way to create a TcpStream connection and then indefinitely listen in a loop for incoming messages, and then responding when they arrive?

I have seen several documentation examples on how to do it with a TcpListener, for ex,

// https://tokio.rs/blog/2018-03-tokio-runtime/
let server = listener.incoming()
    .map_err(|e| println!("error = {:?}", e))
    .for_each(|socket| {
        tokio::spawn(process(socket))
    });

tokio::run(server);

However, in my specific use case, I cannot listen on a port, I need to form a TcpStream connection to a port, send an initiation message, and then afterwards, I need to receive/respond to messages.

Thank you for the help


#2

You’d use TcpStream::connect() to establish a connection to a remote. That gives you a Future that resolves to a TcpStream once the connection is established.

Once you have the TcpStream, a common approach is to split() it into read and write halves. The read portion is a Stream and the write is a Sink. You can then connect the two by mapping (ie transforming) inbound data to outbound data. Take a look also at framing, where you can layer a decoder/encoder to turn raw bytes into Rust types and vice versa.

There should be examples in the tokio GitHub demonstrating the above.


#3

Thank you for your response.

I am already attempting to use frames + codecs with the read/write, the problem I’m having is to have a loop in which I can receive nonblocking requests and then respond to them when my response futures are ready.

I could not find adequate examples anywhere, including in the github repo (not to say they do not exist, but I did not find them).

I will try to use the split method, (which I did not try), and see if that perhaps helps my problem. I will report my findings.


#4

Looking at split now, I do not see how that will help my problem.


#5

Take a look at code starting https://github.com/jgallagher/tokio-chat-example/blob/master/tokio-chat-client/src/main.rs#L162. Ignore the GUI portion there. Basically, you’ll probably want a setup like:

// `tx` is to send frames to server, `rx` is to receive frames from server
let (tx, rx) = sock.framed(...).split();
let client = rx.map(|msg| {
    // process message from server and generate a reply;
}).forward(tx);
tokio::run(client);

So your rx would be the way to receive messages, process them, and generate a response. Those responses are then sent to the server via forward(tx).

Although there are ways to create loops using tokio/futures, that’s generally not how you’d set up a processing pipeline. Instead, you connect the various pieces (e.g. the incoming Stream of server-sent messages to the Sink sending responses to the server) and then run them on the reactor. Put another way, you set up “instructions” on how to process the data flow using the various combinators on Stream/Sink, and then submit them to the reactor.


#6

Thank you for the detailed example, I will definitely give this a try.


#8

In the above example, the process message from server portion of the code only works if the thing I am doing is synchronous, for example, if random_process_message_func returned u32. If I want the random_process_message_func to return Future<u32, _>, what would I need to do to get it to compile?

let (tx, rx) = sock.framed(...).split();
let client = rx.map(|msg| {
    random_process_message_func(msg)
}).forward(tx);
tokio::run(client);

#9

You’d replace rx.map(...) with rx.and_then(...) (or then()).


#10

Thanks again!


#11

Perhaps my understanding is incorrect, but I noticed in the documentation for tokio they recommend using tokio::spawn, so that the executed tasks are done on a executor (threadpool) instead of on the current thread.

The below code that I have works, but should I be using tokio::spawn somewhere for greater efficiency/correctness? If so, I cannot figure out how/where to use it.

Please let me know.

        let codec = MyCodec::new();
        let connection = self.connection; // is tokio tcpstream
        let mut processor = self.request_processor; 

        let initial = connection.framed(codec);

        let (tx, rx) = initial.split();

        let client = rx.and_then(move |msg: MyMessage| {
            process_message(msg, &mut processor)
                .map_err(|e| e.into())
        })
        .forward(tx)
        .map(|_| ())
        .map_err(|_| ());

        tokio::run(client);

#12

spawn is useful to submit a given future state machine to the executor so that it can proceed independently of any other future scheduled in the runtime. This would be considered a “background” state machine. The canonical example is a TCP server that accepts connections. The main accept loop is the root future that’s run(). Each accepted connection is independent of all others, and they get spawn()d to run independently.

In your case you have a single tcp connection (you’re writing a client right?) that is its own state machine - I don’t think there’s anything to spawn here.


#13

Thank you for the explanation.


#14

The above example that I have works perfectly for processing messages in a loop.

However, how can I perform a handshake before beginning the loop?

For example, I would like to do the following:

  1. Read initial message.
  2. Send ack response.
  3. Begin loop as implemented above.

I have been trying to get something to compile with no luck, and since @vitalyd has been so helpful, I figured I’d ask again.


#15

You will want something like:

let initial = connection.framed(codec);
// into_future() allows us to peel off
// a single received message, and then get the 
// `initial` back
let handshake = initial.into_future()
   .map_err(|(e, _)| e) // keep the error, drop the conn
   .and_then(|(msg, io)| {
   // msg is Option<YourMsgType>.  It’s an Option
   // because the peer may have terminated the
   // connection without sending anything.
   // `io` is the framed connection given back to you
   // handle message somehow
   // send request
   io.send(...)
});

// handshake is now a future chain that completes 
// when the request is sent, and returns you back
// the framed.  We’ll tack on our loop as a continuation
let client = handshake.and_then(|conn| {
    // `conn` is the same value as `initial` above
     let (tx, rx) = conn.split();

      rx.and_then(move |msg: MyMessage| {
            process_message(msg, &mut processor)
                .map_err(|e| e.into())
        })
        .forward(tx)
        .map(|_| ())
        .map_err(|_| ())
};
tokio::run(client)

#16

Thank you very much! You are like the Santa Claus of Futures/Tokio explanations.


#17

No problem - happy to help.

:laughing: