How to read Uint32Array sent from JavaScript?

I'm testing this WebTransport server written in Rust. What I do to keep track of the expected length of the incoming message which can span multiple reads is on first client write write the length encoded as Uint32Array. Read that length in the server, now the server knows what is expected N MB later.

This is what I do using a JavaScript-based WebTransport server that I am trying to do in Rust.

What the client does

    let data = new Uint8Array(1024**2*20);

    let header = new Uint8Array(Uint32Array.from(
      {
        length: 4,
      },
      (_, index) => (data.length >> (index * 8)) & 0xff,
    ));
    let view = new DataView(header.buffer);
    let outgoingTotalLength = view.getUint32(0, true);
    console.log({ outgoingTotalLength });
    let incomingTotalLength = 0;
    // ...
    for await (const value of readable) {
      incomingTotalLength += value.length;
      if (incomingTotalLength === outgoingTotalLength) {
        console.log({ incomingTotalLength, outgoingTotalLength });
        break;
      }
    }

In the server, something like this

    let incomingTotalLength = 0;
    let incomingCurrentLength = 0;
    const buffer = new ArrayBuffer(0, { maxByteLength: 4 });
    const view = new DataView(buffer);
    // ...
    if (incomingTotalLength === 0 && incomingCurrentLength === 0) {
              buffer.resize(4);
              for (let i = 0; i < 4; i++) {
                view.setUint8(i, value[i]);
              }
              incomingTotalLength = view.getUint32(0, true);
              console.log(value.length, incomingTotalLength);
              value = value.subarray(4);
        }
      // ...
      buffer.resize(0);
      incomingTotalLength = 0;
      incomingCurrentLength = 0;

The closest I've come to trying to do this in Rust is in a Native Messaging host, which might work here?

pub fn getMessage() -> io::Result<Vec<u8>> {
  let mut stdin = io::stdin();
  let mut length = [0; 4];
  stdin.read_exact(&mut length)?;
  let mut buffer = vec![0; u32::from_ne_bytes(length) as usize];
  stdin.read_exact(&mut buffer)?;
  Ok(buffer)
}

How would you write that out in Rust?

Oh, here's the relevant Rust code

async fn run_session(session: Session) -> anyhow::Result<()> {
    loop {
        // Wait for a bidirectional stream or datagram.
        tokio::select! {
            res = session.accept_bi() => {
                let (mut send, mut recv) = res?;
                log::info!("accepted stream");

                // Read the message and echo it back.
                let msg = recv.read_to_end(usize::MAX).await?;
                log::info!("recv: {}", String::from_utf8_lossy(&msg));

                send.write_all(&msg).await?;
                log::info!("send: {}", String::from_utf8_lossy(&msg));
            },

A generic technique for dealing with network protocol parsing is to do a debug dump of the entire packet of interest, put it into a text editor or a spreadsheet, and manually plan out how you're going to parse it before you start writing the parsing code.

You may be interested in futures_codec - Rust

This helps implementing the messier bits of framing and unframing data from a stream.

Otherwise you can read from a AsyncRead fairly similarly to a bit easier than in JavaScript, depending on if you're using the one from futures or the one from tokio:

let length = input.read_u32_le().await?;
let mut buffer = Vec![0u8; length as usize];
input.read_exact(&mut buf).await?;

I couldn't get that

part to work. msg is evidently a Vec<u8>.

From over here https://stackoverflow.com/q/76749778 this works

let msg = recv.read_to_end(usize::MAX).await?;
let msg_length : u32 = u32::from_le_bytes([msg[0], msg[1], msg[2], msg[3]]);
println!("Message length {}", msg_length);

verifying the bytes back in JavaScript

for await (const value of readable) {
  console.log(new DataView(value.buffer).getUint32(0, true));
  // ...
}

Now I need to complete the rest 1:1. Define two mutable variables where 1) the u32 is stored; and 2) where a u32 is created and incremented when the bytes arrive, which is not always at once. Then when the bytes received reach the total bytes expected, set both variables back to 0.

Any idea how to do that in a very straightforward way?

Thanks. I understand the protocol. I just don't write Rust enough, to scribble out the port from JavaScript to Rust from scratch , yet.

If you read to the end of an unframed stream you can only get a message by writing a whole message then closing the sending half. If you want to keep the connection alive and send multiple messages, you need to frame the stream, that is read exactly as much as you need for each message. Drop that part and instead read from the stream (or better, a buffered stream impl)

The helper read_u32_le() is provided by the tokio version of AsyncReadExt I linked. It's roughly equivalent to:

impl AsyncReadExt: AsyncRead {
    async fn read_u32_le(&mut self) -> Result<u32> {
        let mut bytes = [0u8; 4];
        self.read_exact(&mut bytes)?;
        Ok(u32::from_le_bytes(bytes))
    }
}

So if you're using, say, AsyncReadExt from futures::io then you will have to do the above yourself.

Note I'm using le (for "little endian") explicitly here, to match the get/setUint32(0, true), where you specify the value is little endian.

I'm avoiding that by sending the length in a write first. Then sending N amount of data over M writes up to the encoded length sent first.

So, the idea is to always send the 4 bytes of Uint32Array or in Rust u32 as bytes representing message length predecing the message. If we have that luxury. That's what I do in JavaScript now because I'm able to write 4 bytes, read those, then proceed with N MB of reads until length is reached.

I have also read the first 4 bytes of N bytes. Then proceeded with the read until the length encoded in those first 4 bytes is reached.

Something like this in JavaScript

let data = new Uint8Array(1024**2*20);
let header = new Uint8Array(Uint32Array.from({
  length: 4,
}, (_, index) => (data.length >> (index * 8)) & 0xff, ));
let view = new DataView(header.buffer);
let outgoingTotalLength = view.getUint32(0, true);
let incomingTotalLength = 0;
const writer = writable.getWriter();
await writer.ready;
await writer.write(header).then( () => console.log(`Outgoing total length ${outgoingTotalLength} written.`));
await writer.ready;
await writer.write(data); 
// 20 MB in single write, though can be for example N 66507 
// writes, or even N 8192 writes, up to 20971520
await writer.close();

Then once the peer gets msg_length bytes, reset msg_length to 0.

Let incoming total length and incoming current length be mutable variables set to_0_, defined outside of loop.
Start asynchronous loop.
Let current length be the length of the current read of bytes.
Read, check if incoming total length is 0, if true read 4 the 4 bytes as message length as "32-bit message length in native byte order." (see GitHub - guest271314/NativeMessagingHosts: Native Messaging hosts), else increment incoming current length by current length.
Let incoming current message length be initialized to 0.
Read, increment incoming current length by current length until incoming total length is reached.
Then reset incoming total length and incoming current length to 0.
Then exit; or, increment stream (ID) and repeat the process ad infinitum within the same loop.

Except that it's not JSON over IPC; I'm implementing the Native Messaging protocol, presently, over HTTP/3 with WebTransport.

Isn't that what I described? I was saying you can't do that with a read_to_end, because that waits for the end!

I already got the read the u32 part. using the code from SO.

Now I'm trying to figure out how to implement the algorithm I described in Rust.

Create a couple variables that hold integers in the form of u32, one being incrementable.

If both are 0, read 4 bytes and set the value of first variable to that integer. Else increment second integer until first variable value is reached.

I might be missing something then, because keeping track of how much has been read and stopping when it reaches a certain value is exactly what read_exact does.

Oh. I started with example Rust code. I already did this in JavaScript using WebSocket, TCP, UDP, HTTP/2, a JavaScript WebTrasnport implementation. I'm not a Rustacean. Just trying to learn the language in practice.

So read_exact after getting the initial length. Thanks. I'll try some stuff.

Not your question, but why not to use:

let header = Uint8Array.from(
  { length: 4 },
  (_, i) => (data.length >> (i * 8)) & 0xff
);

instead of the conversion through u32s ?

If I understand your question correctly, because in general Uint8Arrays are expected/used to pass to WHATWG Streams WritableStreamDefaultWriter.

Both snippets produce the exact same Uint8Array value.

In general WHATWG Streams expect a Uint8Array to be passed to WritableStreamDefaultWriter.write() and/or ReadableStreamDefaultController.enqueue().

Not sure what else I can say?

Have you ever used fetch() in the browser you are running? When you use Response.bytes() or Response.body.pipeTo(new WritableStream()) you are more than likely going to get a series of Uint8Arrays.

Really really not sure what else I can say?