How to use FuturesUnordered to endlessly read from N tokio::net::TcpStream

I am looking to create a server that sits between two separate processes. One of them is the "server" and then N number of "clients". Both of are tokio::net::TcpStreams. Any message from the server goes through some logic to figure out which client it should be routed to. Any message from any of the N clients is forwarded to the server. With the amount of traffic that I will ever reasonably expect, this should be easily handled by a single task, so no need to spawn multiple tokio tasks and have to use several Arc>'s. I will be using tokio AsyncReadExt and AsyncWriteExt which are both cancellation safe so I feel rather safe there. I imagine I will end up with some code in the format of

use anyhow; // 1.0.71
use bytes::{Bytes, BytesMut};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio; // 1.28.0
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    //Setup all the client and server connections here
    //Somehow keep FuturesUnordered endlessly populated with clients.read_buf(&mut client_buf)
    let mut client_streams = vec![
        TcpStream::connect("127.0.0.1:1234").await?,
        TcpStream::connect("127.0.0.1:1235").await?,
    ];
    let mut server_stream = TcpStream::connect("127.0.0.1:2345").await?;
    let mut server_buf = BytesMut::with_capacity(1024);
    let mut client_futures = client_streams
        .iter_mut()
        .map(|stream| async move {
            let mut buf = BytesMut::with_capacity(1024);
            stream.read_buf(&mut buf).await.unwrap();
        })
        .collect::<FuturesUnordered<_>>();

    tokio::select! {
        _ = server_stream.read_buf(&mut server_buf) => {
            println!("Server msg came in");
            //handle logic of figuring out which client to write to and just forward the raw bytes for now.
        }
        _ = client_futures.next() => {
           //A message came in from one of the clients, forward the raw bytes to the server
           //The problem I see here is that client_futures will run out.  I do I keep client_futures endlessly reading from the clients.
           println!("Client message came in");
        }
    }
    Ok(())
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
warning: unused import: `Bytes`
 --> src/lib.rs:2:13
  |
2 | use bytes::{Bytes, BytesMut};
  |             ^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: unused import: `AsyncWriteExt`
 --> src/lib.rs:6:31
  |
6 | use tokio::io::{AsyncReadExt, AsyncWriteExt};
  |                               ^^^^^^^^^^^^^

warning: function `main` is never used
 --> src/lib.rs:9:10
  |
9 | async fn main() -> anyhow::Result<()> {
  |          ^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: `playground` (lib) generated 3 warnings (run `cargo fix --lib -p playground` to apply 2 suggestions)
    Finished dev [unoptimized + debuginfo] target(s) in 0.51s

So I do I endlessly keep reading from the client TcpStreams in the FutureUnordered? This does at least currently compile in rust playground.

It looks like you're actually connecting one server to multiple other servers, since your program initiates the connections to all other parties. Is this what you want? Or should clients be able to initiate the connection to your program?

There's nothing wrong with using one task per client in this case, as I think it would make the code's logic clearer. If you don't have many connections, you can run all the tasks on a single worker thread.

Generally, the way you normally do this is by spawning a task per TcpStream. You can read more about that way of doing things here: Actors with Tokio – Alice Ryhl

Yes. One of those connections is a special server connection, the others are clients. In the real application this in between server implements some logic and then forwards the traffic along to the special server connection. Yes, I want/need this server to initiate all the connections, because the system cannot work if this process isn't running in the middle. I originally had it implemented as multiple tasks. But I was trying to get away from Arc::<Mutex::> everywhere. So that is where I was trying to simplify things and get down to a single task.

What you could do, if your program is only going to be very small, is use a loop {} in your main function where on every iteration you try_read() from all your connections and do your dispatching of messages. No need to mess around with FuturesUnordered.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.