Tokio, partial reads and BytesMut

I'm building an encrypting TCP proxy system where one peer sends a series of encrypted fixed-size messages to the other peer, which decrypts them. It uses Tokio 0.2, the bytes crate for buffer management and Sodiumoxide for encryption. Every once in a while, the messages fail to decrypt. Turns out that this happens whenever more than one read is required to get a complete message.

The essence of the "read and decrypt" operation is a loop that calls read_buf() on the TCP stream's ReadHalf, which attempts to fill up a BytesMut buffer. When the length of the buffer equals the expected message size, decryption is attempted and the buffer is cleared (with clear())

On partial reads, something I don't expect is happening. A specific example is this:

  1. The sender sends two messages. Just for the sake of illustration, the two messages are "hello world" and "how are you".
  2. The receiver takes two calls to read_buf() to read the the first message. For the purposes of this example, let's say the reads are of length 7 and 4. So the expected chunks would be "hello w" and "orld"
  3. What ends up in the buffer, however, is the following: "hello whow ". The last part of the first message was apparently discarded and the first part of the next message appears in it place.

I have a logic bug here that I can't spot, or I don't know the proper technique of accumulating data into the BytesMut buffer. Any ideas what I am doing wrong?

It's hard to analyze your problem or suggest corrections when you don't provide a link or post any sample code. Can you reduce or abstract your code to keep the private aspects to yourself, then post that? Or put a minimal example on the Rust playground?

Yes, here is the code where it happens, edited for brevity:

let mut message: usize = 0;
let mut bytes_transferred: usize = 0;
let mut buf = BytesMut::with_capacity(IO_BUF_SZ);
loop {
    let ciphertext_len = reader.read_buf(&mut buf).await?;
    trace!(message, ciphertext_len, IO_BUF_SZ, ciphertext = ?buf[..]);
    if ciphertext_len == 0 {
        trace!("EOF");
        break;
    } else if buf.len() == IO_BUF_SZ {
        message += 1;
        let (padded_plaintext, _tag) = stream.pull(&buf, None).map_err(|_| {
            Error::DecryptMsg(message, buf.len())
        })?;
        /* Code to un-pad the padded plaintext and forward it. Elided
            for clarity.
        */
        buf.clear();
    }
}

Shouldn't that be ciphertext_len == IO_BUF_SZ? buf.len() is always equal.

I wondered if that comparison was valid myself. According to the documentation it's how many bytes it contains, which is distinct (I think anyway) from the capacity.

Well ok, the code above turned out to be correct: it accumulates data correctly whether it comes all at once or in pieces. The problem was the sending code, which wrote a buffer without checking to see if all of the buffer was transmitted. That meant that a short write of message 1 was followed by a write of message 2. Now the strange data that appeared at the other side makes sense.

<head hits desk>

By changing the calls to write() to write_all(), all the data makes it. Moral of the story: both reads and writes can be partial, which in hindsight is perfectly straightforward.

1 Like