Buffering read and write half using Tokio, and recombining them

I have written the following code to spawn servers with a handler method:

use std::error::Error as StdError;
use std::io::Error as IoError;

#[async_trait]
pub trait Server: Send + Sync + 'static {
    async fn handler<R, W>(&self, reader: R, writer: W) -> Result<(), Box<dyn StdError>>
    where
        R: AsyncBufRead + Unpin + Send,
        W: AsyncWrite + Unpin + Send;

    fn buffer_and_spawn<C>(self: &Arc<Self>, conn: C) -> Result<(), IoError>
    where
        C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    {
        let (reader, writer) = split(conn);
        let reader = BufReader::new(reader);
        let writer = BufWriter::new(writer);
        let this = self.clone();
        spawn(async move {
            if let Err(err) = this.handler(reader, writer).await {
                eprintln!("Error in handler: {}", err);
            }
        });
        Ok(())
    }

    async fn run_local<P>(self: Arc<Self>, path: P) -> Result<(), IoError>
    where
        P: AsRef<Path> + Send,
    {
        /* some code that also calls `buffer_and_spawn` method */
    }

    async fn run_network<A>(self: Arc<Self>, addrs: A) -> Result<(), IoError>
    where
        A: ToSocketAddrs + Send,
    {
        let listener = TcpListener::bind(addrs).await?;
        loop {
            let (conn, _addr) = listener.accept().await?;
            self.buffer_and_spawn(conn)?;
        }
    }
}

Question 1: Is it possible to somehow re-combine the read and write halves of the stream after buffering them here?

let (reader, writer) = split(conn);
let reader = BufReader::new(reader);
let writer = BufWriter::new(writer);

Such that I can pass a single value (e.g. conn) to handler (instead of separate reader and writer values)? I assume that unsplit won't work here, because the reader isn't of type ReadHalf anymore after buffering it.

Question 2: What's the idiomatic way to pass I/O streams to handlers? A single value that implements AsyncBufRead + AsyncWrite or two separate values?

For this case, it looks like you want BufStream

1 Like

Oh, I didn't know about BufSteam. That drastically simplifies my code:

fn buffer_and_spawn<C>(self: &Arc<Self>, conn: C) -> Result<(), IoError>
where
    C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    let this = self.clone();
    spawn(async move {
        if let Err(err) = this.handler(BufStream::new(conn)).await {
            eprintln!("Error in handler: {}", err);
        }
    });
    Ok(())
}

I still think this isn't a generic solution in all cases. Not sure if that is a generic solution though, e.g. if I want to buffer only one half, or use different means of buffering. But for my use-case, it will work fine!

Considering that BufStream exists, I'll assume passing a single value is a reasonable way to go.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.