I am trying to write a bridge from Read
to tokio::io::AsyncRead
. The inverse of tokio_util::io::SyncIoBridge
.
I feel that this should be simpler than this and I am probably missing something crucial:
struct AsyncReadBridge<T> {
read: Option<T>,
send: UnboundedSender<Bytes>,
recv: UnboundedReceiver<Bytes>,
rt: tokio::runtime::Handle,
}
impl<T: Read + Unpin + Send> AsyncReadBridge<T> {
fn new(read: T) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
Self {
read: Some(read),
send: tx,
recv: rx,
rt: tokio::runtime::Handle::current(),
}
}
}
impl<T: Read + Unpin + Send + 'static> AsyncRead for AsyncReadBridge<T> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.get_mut();
if let Some(mut read) = this.read.take() {
let len = buf.capacity();
let send = this.send.clone();
this.rt.spawn_blocking(move || {
let mut buf = BytesMut::zeroed(len);
loop {
match read.read(&mut buf) {
Ok(l) if l == 0 => {
if let Err(err) = send.send("".into()) {
error!("Could not send: {err:?}");
}
break;
},
Ok(l) => {
let msg = Bytes::copy_from_slice(&buf[0..l]);
if let Err(err) = send.send(msg) {
error!("Could not send: {err:?}");
break;
}
}
Err(err) => {
error!("Could not read: {err:?}");
break;
}
};
}
});
}
match this.recv.poll_recv(cx) {
Poll::Ready(Some(mut bytes)) => {
buf.put_slice(&mut bytes);
Poll::Ready(Ok(()))
}
Poll::Ready(None) => {
Poll::Ready(Ok(()))
},
Poll::Pending => Poll::Pending,
}
}
}
This is not great, too many allocations and a task that we never join on later. I think I could poll the join handler also, making this a bit more complicated. But not sure if I can implement this without this many allocations?
Is there a better way to do this?