How does `Stream` and `Read` actually work?

I call an foreign API and I might get back what I want, or I might get back an error stream.

So I have some code that handles this:

// FIXME: how to `read_until` rather than blindly accepting
// the first chunk, no matter how big the stream?
while let Some(bytes) = error_stream.try_next().await? {
    match std::str::from_utf8(&bytes) {
        Ok(s) => error.push_str(s),
        Err(_) => break,
    }
    len += bytes.len();
    if len > 1024 {
        break;
    };
}

But notice how if they decide to give me some chunk of bytes that is super massive, I'm screwed?

I thought about converting it into an AsyncRead with StreamReader which has read_buf which will stop reading once my buffer is full. but I was wondering that if I know the AsyncRead is actually a Stream under the hood, does that mean I simply mask the problem with a correct looking abstraction, or does it actually fix the problem?

Ultimately, there's nothing you can do to stop the stream from delivering a massive chunk. However, streams generally won't do that. It's generally fine to just stop once you get to a chunk that pushes you over the maximum.

StreamReader doesn't change anything here. It just stores the current chunk and reads from that, waiting for a new chunk when the current chunk is empty.

1 Like

Hi Alice!

Thank you for your advice.

So a Stream is a rustland concept, i get that, so the libraries I use are trusted, and we trust they will do the right thing.

But at some point as you get more lower level, it comes down to the OS, the kernel and the network driver, I guess...

Could you (or anyone) maybe explain the chain of command, from some data outside of my computer, into ultimately a high level Stream?

I guess it maybe starts with the TCP "packet" (i've also heard something called a "segment")... I guess that's the smallest unit of data that can be received... I think that's determined by your OS, right? then how does it enter rust? must it by definition be a Read before it becomes AsyncRead and then finally a Stream? etc.

Thank you so much for sharing this wisdom!!

Ultimately, it comes down to how the chunks are created. If they are created by reading from a TcpStream (more generally, any AsyncRead), then you pass a maximum length when reading data (the maximum length is the length of the buffer you are reading in to), so you can control the maximum chunk size that way. For example, this is how ReaderStream works.

Of course, in principle if you have a step somewhere in your stream that increases chunk sizes, e.g. decompression, then you could end up with a zip-bomb vulnerability where chunks could get very large.

1 Like

The following is simplified, but close enough to accurate to be a useful mental model; it's not enough if you're writing the OS, though.

At the bottommost level, you have a piece of hardware such as a network card. This supplies data to the OS in chunks whose range of sizes is set by the hardware design; for example, the WiFi chip in my current laptop receives data in chunks of up to 6,500,631 bytes at a time (it supports the maximum possible A-MPDU size for 802.11ax). This is a hardware limit, and cannot be exceeded; you can get back less than that; for example, a single packet might come in carrying just 1 byte of payload. Some of that 6,500,631 bytes of data is not useful payload - it includes things like the WiFi framing that allows the WiFi chip to recognise that this payload is for it to receive, the IP framing that lets the OS recognise that this is IP packets, the TCP framing that turns packets into a stream of bytes and so on.

My OS has set things up so that when the WiFi chip receives data, it writes it to RAM in an agreed format, and notifies the OS that it's done so. The data that comes in tells the OS not just that there is payload, but how much payload there is - it could be a single frame received carrying 1 byte, it could be a maximum size A-MPDU carrying 6 MiB of payload.

A Rust program then asks the OS to pass the data that's been received; it tells the OS how much data it's willing to read in one go. The OS then moves at most that much data from the network receive area (where it's been buffered by the OS) into a buffer supplied by the Rust program. The Rust program is also able to ask the OS if there is data waiting to be read by the Rust program; Tokio uses this to avoid repeatedly asking if there's data waiting from the network, for example.

Once the data has been read, you have a [u8]; something in your Rust program will translate this into a T (for example using tokio_util::codec - Rust), which it then makes available as a impl Stream<T> for you to await as needed by your program.

2 Likes