Returning multiple 'views' of the same AsyncRead

Hello there!

I'm writing a library (roughly) à la Google's ProtoBuf that, in contrast to ProtoBuf, supports streaming parts of the message.

I've made great progress in implementing small Encoder, Decoder, Encodable and Decodable traits, but there's one giant issue where I'm stuck.

Currently I want to support two views that can be streamed:

  • A Blob for raw bytes
  • A Readable<T> for a sequence of nested messages T

We'll focus on the Blob because the issue is identical.

pub struct Blob<'a> {
    size: u64,
    stream: Arc<Mutex<dyn futures::stream::Stream<Item = std::result::Result<bytes::BytesMut, std::io::Error> + 'a>>,
}

In Encoder, the consumer can pipe e.g. a file into a stream, create a Message or something similar with a Blob that wraps that stream and turn over that object to my encoder, which will consume the Blob. This works wonderfully.

In Decoder, I need to return 'views' on the underlying AsyncRead. Currently, I return futures::stream::Stream<Item = std::result::Result<bytes::BytesMut, std::io::Error>.

// Example message
// #[derive(Encode, Decode)] // in a later stage
pub struct AddFile<'a> {
    path: String,
    content: Blob<'a>,
}

This is where I construct my Blob when reading a message:

impl <R: AsyncRead + Unpin> Decoder for AsyncReadDecoder<R> {

    async fn decode_blob(&mut self) -> Result<Blob<'_>> {
        let size = self.decode_u64().await?;
        let reader: FramedRead<&mut R, BytesCodec> = FramedRead::new(&mut self.reader, BytesCodec::new());
        Ok(Blob {
            size,
            stream: Arc::new(Mutex::new(reader)),
        })
    }

}

Testing the decoder on AddFile:

impl <'a> Decodable for AddFile<'a> {
    async fn decode<D: Decoder>(decoder: &mut D) -> Result<Self> {
        Ok(AddFile {
            path: decoder.decode().await?,
            content: decoder.decode_blob().await?,
        })
    }
}

I get this lifetime related error:

error: lifetime may not live long enough
   --> src/lib.rs:332:13
    |
330 |       impl <'a> Decodable for AddFile<'a> {
    |             -- lifetime `'a` defined here
331 |           async fn decode<D: Decoder>(decoder: &mut D) -> Result<Self> {
    |                                                - let's call the lifetime of this reference `'1`
332 | /             Ok(AddFile {
333 | |                 path: decoder.decode().await?,
334 | |                 content: decoder.decode_blob().await?,
335 | |             })
    | |______________^ associated function was supposed to return data with lifetime `'a` but it is returning data with lifetime `'1`

I think I get the issue: the reason this doesn't work is because returning a view of a &mut AsyncRead would give two mutable references to the same &mut AsyncRead. My question is, then: how do you solve this? Is there some kind of pattern I'm missing? Or is this genuinely difficult to achieve in Rust?

Thanks,
Sam

The problem that is causing the error is that this function takes an &'1 mut D and produces an AddFile<'a>, and '1 and 'a have no relationship, but they need to match, because the Blob<'a> in AddFile<'a> needs to own an &'a mut D. Change the signature so that there is no '1 lifetime, just 'a:

impl <'a> Decodable for AddFile<'a> {
    async fn decode<D: Decoder>(decoder: &'a mut D) -> Result<Self> {

In general, whenever you have a type with a lifetime parameter, the constructor function(s) for that struct must have a signature that involves that lifetime in the input, too (unless the lifetime isn’t being used in that case).

2 Likes

Thanks, I'll try the solution (hopefully) tomorrow!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.