I am looking to create a server that sits between two separate processes. One of them is the "server" and then N number of "clients". Both of are tokio::net::TcpStreams. Any message from the server goes through some logic to figure out which client it should be routed to. Any message from any of the N clients is forwarded to the server. With the amount of traffic that I will ever reasonably expect, this should be easily handled by a single task, so no need to spawn multiple tokio tasks and have to use several Arc>'s. I will be using tokio AsyncReadExt and AsyncWriteExt which are both cancellation safe so I feel rather safe there. I imagine I will end up with some code in the format of
use anyhow; // 1.0.71
use bytes::{Bytes, BytesMut};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio; // 1.28.0
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
//Setup all the client and server connections here
//Somehow keep FuturesUnordered endlessly populated with clients.read_buf(&mut client_buf)
let mut client_streams = vec![
TcpStream::connect("127.0.0.1:1234").await?,
TcpStream::connect("127.0.0.1:1235").await?,
];
let mut server_stream = TcpStream::connect("127.0.0.1:2345").await?;
let mut server_buf = BytesMut::with_capacity(1024);
let mut client_futures = client_streams
.iter_mut()
.map(|stream| async move {
let mut buf = BytesMut::with_capacity(1024);
stream.read_buf(&mut buf).await.unwrap();
})
.collect::<FuturesUnordered<_>>();
tokio::select! {
_ = server_stream.read_buf(&mut server_buf) => {
println!("Server msg came in");
//handle logic of figuring out which client to write to and just forward the raw bytes for now.
}
_ = client_futures.next() => {
//A message came in from one of the clients, forward the raw bytes to the server
//The problem I see here is that client_futures will run out. I do I keep client_futures endlessly reading from the clients.
println!("Client message came in");
}
}
Ok(())
}
Errors:
Compiling playground v0.0.1 (/playground)
warning: unused import: `Bytes`
--> src/lib.rs:2:13
|
2 | use bytes::{Bytes, BytesMut};
| ^^^^^
|
= note: `#[warn(unused_imports)]` on by default
warning: unused import: `AsyncWriteExt`
--> src/lib.rs:6:31
|
6 | use tokio::io::{AsyncReadExt, AsyncWriteExt};
| ^^^^^^^^^^^^^
warning: function `main` is never used
--> src/lib.rs:9:10
|
9 | async fn main() -> anyhow::Result<()> {
| ^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: `playground` (lib) generated 3 warnings (run `cargo fix --lib -p playground` to apply 2 suggestions)
Finished dev [unoptimized + debuginfo] target(s) in 0.51s
So I do I endlessly keep reading from the client TcpStreams in the FutureUnordered? This does at least currently compile in rust playground.