Question about generators/coroutines and lifetimes

Hi, I'm looking into using genawaiter as a way to parse bytes from a network protocol in a way that isn't tied to sync vs. async I/O while the coroutine feature isn't yet available in the stable rust language. This part is pretty straightforward and easy to get working. I have a coroutine that accepts a stream of byte slices and produces a stream of protocol-level packets:

async fn parse_packets_co(
    input: genawaiter::rc::Gen<io::Result<&[u8]>, (), impl Future<Output = ()>>,
    co: genawaiter::rc::Co<io::Result<Packet>>,
) {
    for buf in input {
       // parse the buffer
       // yield the packet(s) if enough data is received
    }
}

I also want to use this coroutine to consume from a generator that supplies the byte stream by repeatedly yielding a slice of the same backing buffer, which is modified after each read. I can't seem to find the right way to do this, except with some unsafe code that creates a &'static alias to the read data and passes it forward. Here's what I have now:

async fn read_blocking_gen(co: genawaiter::rc::Co<std::io::Result<&'static [u8]>>) {
    let mut stdin = io::stdin().lock();
    let mut buf = [0u8; 128];
    loop {
        let len = match stdin.read(&mut buf) {
            Ok(len) => len,
            Err(e) => {
                co.yield_(Err(e)).await;
                break;
            }
        };
        if len == 0 {
            break;
        }
        // Assume the receiver releases the slice before this is resumed
        let slice: &'static [u8] = unsafe { std::slice::from_raw_parts(buf.as_ptr(), len) };
        co.yield_(Ok(slice)).await;
    }
}

This code does work for me but I would want to write the unsafe part as something like this:

co.yield_(Ok(&buf[..len])).await;

Obviously, if I try to yield a slice of the original buffer this can't work in the general case since nobody knows how long the receiver needs to work with the data before this generator needs to modify the underlying data or exit. But if I can have a guarantee the receiver will no longer hold a reference after this generator is resumed then it looks like it can work.

Anyway, does anyone know what's the proper way to implement this kind of generator or if it's even possible with genawaiter? I assume that if so, it has something to do with communicating that lifetime of the borrowed data is constrained until the yield point so that the read code can modify the buffer again. Or is it better to do something like sending the chunk of data into the receiver coroutine and receiving it back into the generator after resuming?

Take a look at this project.

For me, a working solution was to move the read data into the receiver and move the read data back after resuming, so it's no longer a simple generator. This also relies on the bytes crate to split the scratch buffer at the point where the read ends and rejoin the the pair after resuming. Otherwise I could create a special type containing the scratch buffer and the amount of valid bytes in the scratch buffer.

async fn read_blocking_gen(co: rc::Co<io::Result<BytesMut>, Option<BytesMut>>) {
    let mut stdin = io::stdin().lock();
    let mut buf = BytesMut::zeroed(128);
    loop {
        let len = match stdin.read(&mut buf) {
            Ok(len) => len,
            Err(e) => {
                co.yield_(Err(e)).await;
                break;
            }
        };
        if len == 0 {
            break;
        }
        let chunk = buf.split_to(len);
        match co.yield_(Ok(chunk)).await {
            None => break,
            Some(chunk) => buf.unsplit(chunk),
        };
    }
}

Just to end this topic, it's a duplicate of lang-team/src/design_notes/general_coroutines.md at 401f90116f28f07fd7c4680869add68f71441a2a · rust-lang/lang-team · GitHub

1 Like