Converting a stream of `TcpStream` into a fused stream of requests

I am trying to take the stream of TcpStreams produced by TcpStream::incoming() and turn it into a stream of requests with metadata in order to make the handler logic request-oriented rather than connection-oriented.

My strategy right now is to create a new channel for each client and then flatten all those channels into a single stream. Each channel also needs to drive two separate futures for reading and writing data to the underlying TcpStream, so I'm using take_until on each client's stream. The handler in the select! macro needs a channel to send responses back to the write_fut, so the request sender also sends a clone of the response sender for each request.

However... the stream I'm currently producing doesn't satisfy FusedFuture and so can't be used with select. I'm not sure why this is the case.

I'm also not confident I'm using take_until the way it's intended.

Here's my simplified code:

use std::net::SocketAddr;

use futures::{channel::mpsc, future, AsyncReadExt, SinkExt, StreamExt};

use async_std::{
    io::{ReadExt, WriteExt},
    net::TcpListener,
    task,
};

fn main() {
    task::block_on(async { run_test().await });
}

async fn run_test() {
    let tcp_listener = TcpListener::bind("127.0.0.1:8080".parse::<SocketAddr>().unwrap())
        .await
        .unwrap();
    let service_stream = tcp_listener
        .incoming()
        .map(|stream| {
            let stream = stream.unwrap();
            let peer_addr = stream.peer_addr().unwrap();
            let (reader, mut writer) = stream.split();
            let (res_tx, mut res_rx) = mpsc::channel(0);
            let (req_tx, req_rx) = mpsc::channel(0);

            // Future for receiving requests from peer
            let read_fut = async move {
                let req_tx = req_tx.clone();
                reader
                    .bytes()
                    .for_each(|req| {
                        let mut req_tx = req_tx.clone();
                        let res_tx = res_tx.clone();
                        async move {
                            req_tx.send((peer_addr, req, res_tx)).await.unwrap();
                        }
                    })
                    .await;
            };

            // Future for writing responses to peer
            let write_fut = async move {
                while let Some(byte) = res_rx.next().await {
                    writer.write_all(&[byte]).await.unwrap()
                }
            };

            // Stream needs to drive both futures as it produces items
            req_rx.take_until(future::join(read_fut, write_fut))
        })
        .flatten()
        .fuse();
    loop {
        futures::select! {
            (peer_addr, req, tx) = service_stream => {
                println!("got request from: {peer_addr}");
                tx.send(req).await.unwrap() // echo
            }
            // process another stream
        }
    }
}

The stream doesn't implement FusedFuture because streams aren't futures. You need to call next to get a future that resolves to the next element. There were a couple other compile errors there too that needed to be fixed

loop {
    futures::select! {
        res = service_stream.next() => {
            let (peer_addr, req, mut tx) = res.unwrap();
            println!("got request from: {peer_addr}");
            tx.send(req).await.unwrap() // echo
        }
        // process another stream
    }
}

The take_while definitely feels a bit strange. I think part of the problem is that you have requests from multiple connections all being polled by a single task. That means only a single thread will be able to service requests at a time. It's more typical for each connection to get a task that services requests from that connection.

If you spawned a task for each connection instead you wouldn't need to tie the read and write futures to the request stream at all in the return value of the map closure.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.