I need to take data from a Stream, process it, and send it to a Sink.
In this simplified example, I take a stream::iter(...)
of bytes, don't do any processing, and the send them to a TcpStream
. This requires having multiple references to the TcpClient
across different futures. It also requires those references to be mutable, so I think I need to wrap TcpClient
in an Arc<Mutex<>>
, clone it for each future, and then lock it within the future.
This is not a conventional way to use a TcpStream
, but it's a minimal example of the problem I'm experiencing.
use async_std::task;
use futures::{stream, StreamExt};
use std::{
fmt::Debug,
io::{Read, Write},
net::SocketAddr,
sync::{Arc, Mutex},
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let listen_addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
task::block_on(async {
let client = std::net::TcpStream::connect(listen_addr)?;
let client_amx = Arc::new(Mutex::new(client));
let bytes = [1, 2, 3];
stream::iter(bytes)
.for_each(|byte| {
let client_amx = client_amx.clone();
async move {
let mut client = client_amx.lock().unwrap();
match client.write(&[byte]) {
Ok(_) => {}
Err(e) => log::error!("{e:?}"),
}
}
})
.await;
let client = client_amx.lock().unwrap();
assert_eq!(
client.bytes().map(|b| b.unwrap()).collect::<Vec<u8>>(),
bytes
);
Result::<(), Error>::Ok(())
})?;
Ok(())
}
The type error I'm getting is "cannot move out of dereference of std::sync::MutexGuard<'_, std::net::TcpStream>
". I've also tried to pinning the TcpStream
before wrapping it, but Pin<Box<>>
only implements DerefMut
when the thing its pointing to does as well.
I suspect I am misunderstanding the way to properly send a Sink across threads. How can get a single sink to accept data that may be sent from different futures?