Briding sync and async, Read to AsyncRead

I am trying to write a bridge from Read to tokio::io::AsyncRead. The inverse of tokio_util::io::SyncIoBridge.

I feel that this should be simpler than this and I am probably missing something crucial:

struct AsyncReadBridge<T> {
    read: Option<T>,
    send: UnboundedSender<Bytes>,
    recv: UnboundedReceiver<Bytes>,
    rt: tokio::runtime::Handle,
}

impl<T: Read + Unpin + Send> AsyncReadBridge<T> {
    fn new(read: T) -> Self {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();

        Self {
            read: Some(read),
            send: tx,
            recv: rx,
            rt: tokio::runtime::Handle::current(),
        }
    }
}

impl<T: Read + Unpin + Send + 'static> AsyncRead for AsyncReadBridge<T> {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        let this = self.get_mut();

        if let Some(mut read) = this.read.take() {
            let len = buf.capacity();
            let send = this.send.clone();

            this.rt.spawn_blocking(move || {
                let mut buf = BytesMut::zeroed(len);

                loop {
                    match read.read(&mut buf) {
                        Ok(l) if l == 0 => {
                            if let Err(err) = send.send("".into()) {
                                error!("Could not send: {err:?}");
                            }
                            break;
                        },
                        Ok(l) => {
                            let msg = Bytes::copy_from_slice(&buf[0..l]);

                            if let Err(err) = send.send(msg) {
                                error!("Could not send: {err:?}");
                                break;
                            }
                        }
                        Err(err) => {
                            error!("Could not read: {err:?}");
                            break;
                        }
                    };
                }
            });
        }

        match this.recv.poll_recv(cx) {
            Poll::Ready(Some(mut bytes)) => {
                buf.put_slice(&mut bytes);
                Poll::Ready(Ok(()))
            }
            Poll::Ready(None) => {
                Poll::Ready(Ok(()))
            },
            Poll::Pending => Poll::Pending,
        }
    }
}

This is not great, too many allocations and a task that we never join on later. I think I could poll the join handler also, making this a bit more complicated. But not sure if I can implement this without this many allocations?

Is there a better way to do this?

You don't send back a read error so a program will hang (never poll ready on error.)
More important buf.put_slice(&mut bytes); the length is not fixed so will panic once buff is smaller than bytes.

I think this is an implementation (hidden away) (not looked far.)
https://docs.rs/crate/tokio/1.26.0/source/src/io/blocking.rs

Thanks that was helpful, I see how I can avoid using a channel now.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.