How can I take data from a Stream and send it to a Sink across multiple futures?

I need to take data from a Stream, process it, and send it to a Sink.

In this simplified example, I take a stream::iter(...) of bytes, don't do any processing, and the send them to a TcpStream. This requires having multiple references to the TcpClient across different futures. It also requires those references to be mutable, so I think I need to wrap TcpClient in an Arc<Mutex<>>, clone it for each future, and then lock it within the future.

This is not a conventional way to use a TcpStream, but it's a minimal example of the problem I'm experiencing.

use async_std::task;
use futures::{stream, StreamExt};
use std::{
    fmt::Debug,
    io::{Read, Write},
    net::SocketAddr,
    sync::{Arc, Mutex},
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    env_logger::init();
    let listen_addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
    task::block_on(async {
        let client = std::net::TcpStream::connect(listen_addr)?;
        let client_amx = Arc::new(Mutex::new(client));
        let bytes = [1, 2, 3];
        stream::iter(bytes)
            .for_each(|byte| {
                let client_amx = client_amx.clone();
                async move {
                    let mut client = client_amx.lock().unwrap();
                    match client.write(&[byte]) {
                        Ok(_) => {}
                        Err(e) => log::error!("{e:?}"),
                    }
                }
            })
            .await;
        let client = client_amx.lock().unwrap();
        assert_eq!(
            client.bytes().map(|b| b.unwrap()).collect::<Vec<u8>>(),
            bytes
        );
        Result::<(), Error>::Ok(())
    })?;
    Ok(())
}

The type error I'm getting is "cannot move out of dereference of std::sync::MutexGuard<'_, std::net::TcpStream>". I've also tried to pinning the TcpStream before wrapping it, but Pin<Box<>> only implements DerefMut when the thing its pointing to does as well.

I suspect I am misunderstanding the way to properly send a Sink across threads. How can get a single sink to accept data that may be sent from different futures?

Okay I stepped away for a bit and now I think the solution here is to create two futures and use a channel to communicate between them.

fn main() -> Result<(), Box<dyn std::error::Error>> {
    env_logger::init();
    let listen_addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
    task::block_on(async {
        let mut client = std::net::TcpStream::connect(listen_addr)?;
        let (tx, mut rx) = mpsc::channel(32);
        let tx = Arc::new(Mutex::new(tx));
        let bytes = [1u8, 2, 3];
        let send_fut = stream::iter(bytes).for_each(|byte| {
            let tx = tx.clone();
            async move {
                let mut tx = tx.lock().unwrap();
                match tx.send(byte).await {
                    Ok(_) => {}
                    Err(e) => log::error!("{e:?}"),
                }
            }
        });
        let recv_fut = async {
            for _ in 1..3 {
                if let Some(byte) = rx.next().await {
                    match client.write(&[byte]) {
                        Ok(_) => {}
                        Err(e) => log::error!("{e:?}"),
                    }
                }
            }
        };
        futures::future::join(send_fut, recv_fut).await;
        assert_eq!(
            client.bytes().map(|b| b.unwrap()).collect::<Vec<u8>>(),
            bytes
        );
        Result::<(), Error>::Ok(())
    })?;
    Ok(())
}

You are using blocking IO resources and locks, which is not allowed in async code. Read this article for an explanation of why this is bad.

Additionally, there's no reason to put a tx in an Arc/Mutex because an mpsc sender can be cloned on its own. This should still work:

 let (tx, mut rx) = mpsc::channel(32);
-let tx = Arc::new(Mutex::new(tx));
 let bytes = [1u8, 2, 3];
 let send_fut = stream::iter(bytes).for_each(|byte| {
     let tx = tx.clone();
     async move {
-        let mut tx = tx.lock().unwrap();
         match tx.send(byte).await {
             Ok(_) => {}
             Err(e) => log::error!("{e:?}"),
         }
     }
 });
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.