Does it make sense to implement std::io::Read/Write for a transaction-based protocol?

I need a second opinion.

I am implementing a protocol that is based on transactions. I.e. You sent a payload of bytes, e.g. Box<[u8]> and receive a response std::io::Result<Box<[u8]>> for that.

Does it make sense to expose an interface for this kind of API, which implements std::io::Read and std::io::Write?

I wrote the following prototype:


#[derive(Debug)]
pub struct Host2<const BUF_SIZE: usize>(State<BUF_SIZE>);

impl<const BUF_SIZE: usize> Host2<BUF_SIZE> {
    #[must_use]
    pub const fn new(sender: Sender<Request>) -> Self {
        Self(State::WritingToBuffer {
            sender,
            buffer: heapless::Vec::new(),
        })
    }
}

impl<const BUF_SIZE: usize> Write for Host2<BUF_SIZE> {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        if let State::WritingToBuffer { buffer, .. } = &mut self.0 {
            info!("Writing: {:?}", buf);
            info!("Writing: {} bytes.", buf.len());
            return buffer
                .extend_from_slice(buf)
                .map(|_| buf.len())
                .map_err(|_| {
                    std::io::Error::new(ErrorKind::OutOfMemory, "ASHv2: Buffer is full.")
                });
        }

        Err(std::io::Error::new(
            ErrorKind::Other,
            "ASHv2: Cannot write in current state.",
        ))
    }

    fn flush(&mut self) -> std::io::Result<()> {
        if let State::WritingToBuffer { sender, buffer } = &mut self.0 {
            let (request, receiver) = Request::new(buffer.as_slice().into());
            let result = sender.send(request).map_err(|_| {
                std::io::Error::new(ErrorKind::BrokenPipe, "ASHv2: Failed to send request.")
            });
            self.0 = State::Flushed {
                sender: sender.clone(),
                receiver,
            };
            return result;
        }

        Err(std::io::Error::new(
            ErrorKind::Other,
            "ASHv2: Can only flush once after writing data.",
        ))
    }
}

impl<const BUF_SIZE: usize> Read for Host2<BUF_SIZE> {
    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
        if let State::Flushed { sender, receiver } = &self.0 {
            match receiver.recv() {
                Ok(result) => {
                    let mut buffer = heapless::Vec::new();
                    buffer.extend_from_slice(&result?).map_err(|_| {
                        std::io::Error::new(ErrorKind::OutOfMemory, "ASHv2: Buffer is full.")
                    })?;
                    self.0 = State::ReadingFromBuffer {
                        sender: sender.clone(),
                        buffer,
                    };
                }
                Err(_) => {
                    return Err(std::io::Error::new(
                        ErrorKind::BrokenPipe,
                        "ASHv2: Response channel disconnected.",
                    ))
                }
            }
        }

        if let State::ReadingFromBuffer { sender, buffer } = &mut self.0 {
            return if buffer.is_empty() {
                self.0 = State::WritingToBuffer {
                    sender: sender.clone(),
                    buffer: heapless::Vec::new(),
                };
                Ok(0)
            } else {
                let bytes = buf.write(buffer)?;
                #[allow(unsafe_code)]
                // SAFETY: We just wrote `bytes` bytes to `buf`, so we can safely
                // reduce the current length to the amount of remaining bytes.
                unsafe {
                    buffer.set_len(buffer.len().saturating_sub(bytes));
                }
                Ok(bytes)
            };
        }

        Ok(0)
    }
}

#[derive(Debug)]
enum State<const BUF_SIZE: usize> {
    WritingToBuffer {
        sender: Sender<Request>,
        buffer: heapless::Vec<u8, BUF_SIZE>,
    },
    Flushed {
        sender: Sender<Request>,
        receiver: Receiver<std::io::Result<Box<[u8]>>>,
    },
    ReadingFromBuffer {
        sender: Sender<Request>,
        buffer: heapless::Vec<u8, BUF_SIZE>,
    },
}

but this feels and looks wrong, since I have this internal state management because of the transaction-based protocol.

What do you think, should I implement these traits or just use the transaction based API that I already have:

impl<T> SyncAsh for T
where
    T: Host,
{
    fn communicate(&self, payload: &[u8]) -> std::io::Result<Box<[u8]>> {
        let (request, response) = Request::new(payload.into());
        self.send(request).map_err(|_| {
            std::io::Error::new(ErrorKind::BrokenPipe, "ASHv2: Failed to send request.")
        })?;
        response.recv().map_err(|_| {
            std::io::Error::new(
                ErrorKind::BrokenPipe,
                "ASHv2: Response channel disconnected.",
            )
        })?
    }
}

I don't see why not. All kind of things implement Read and Write when there is a huge lot of state being maintained, protocols going on under the hood etc. Think reading and writing files for example, which has the entire operating system under it including all the transactions going on between your machine and the storage device.

It doesn't look like Read/Write is the right tool here. You're not supposed to have transactions like this with those traits, and the behavior certainly shouldn't change when you add or remove calls to flush in the middle. You'll find that types like BufWriter will actually flush inside write if the buffer becomes too large.

5 Likes

Yeah. I have the same concerns. I basically "lie" to the user that they have something that they can arbitrarily write to and read from, when in reality, the only working flow is write [, write, ...], flush, read-until-end, repeat.

It would be nice to have a trait for "atomic" communication in std::io, like

Do you really need a trait? I think you can just have a struct with a normal method on it.

It is technically not necessary. The current implementation uses custom traits 1, 2 just because I wanted to avoid writing a wrapper around Sender and SyncSender respectively.

It would, however, be nice to have an API that supports some standard library interface.

Ah, sorry, I should have looked at your code a bit harder.

I agree. I think that as well as implementing the actual methods, parameters, returns and types that a trait calls for one should adhere to the intended meaning of the trait.

I mean, one could send morse code via the Read/Write traits (dot = read(), dash = write()) but that would not be in the spirit of the thing.

Kind of like misusing operating overloading to do things that are not obvious from the operators.

1 Like

Traits are not used to make an API nice, they are used for abstraction when there will be multiple implementations of the trait.

1 Like

Given the protocol you linked in your internals post, this looks like something that would be better handled via tokio_util::codec's Encoder and Decoder traits, along with the Framed abstraction it provides for sending and receiving bytes.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.