Can't create a generic decoder in the async runtime

I want to asynchronously read a stream of bytes separated by u32s indicating the size of one "packet", then read the packet and deserialize from specific format.

For example, a code that reads json in a loop:

use {
    serde::Deserialize,
    serde_json::value::RawValue,
    std::{io, pin::Pin},
    tokio::io::{AsyncRead, AsyncReadExt},
};

async fn _task() {
    // read bytes from some async reader file/socket/etc
    let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
    let non_cancelled = true;
    tokio::spawn(async move {
        let mut dec = JsonDecoder::default();
        while non_cancelled {
            let json: &RawValue = dec.decode(&mut read).await?;
            println!("{json}");
        }

        Ok::<_, io::Error>(())
    });
}

#[derive(Default)]
pub struct JsonDecoder {
    buf: Vec<u8>,
}

impl JsonDecoder {
    pub async fn decode<'d, T, R>(&'d mut self, read: &mut R) -> Result<T, io::Error>
    where
        T: Deserialize<'d>,
        R: AsyncRead + Unpin,
    {
        let len = read.read_u32_le().await?;
        self.buf.clear();
        self.buf.resize(len as usize, 0);
        read.read_exact(&mut self.buf).await?;
        let val = serde_json::from_slice(&self.buf)?;
        Ok(val)
    }
}

playground

I was surprised that this code worked and did not complain about lifetimes, despite the fact that I ran it in a multi-threaded runtime. I love this T: Deserialize<'d> which allows me to get rid of unnecessary allocations and copying.

But this decoder only works with json. I decided to generalize the decoder to be able to support other formats, for example bincode (also I didn’t want to use serde, but use native bincode's Encode/Decode traits).

I rewrote the decoder to use a generic implementation:

use {
    serde::Deserialize,
    serde_json::value::RawValue,
    std::{io, pin::Pin},
    tokio::io::{AsyncRead, AsyncReadExt},
};

async fn _task() {
    // read bytes from some async reader file/socket/etc
    let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
    let cancelled = true;
    tokio::spawn(async move {
        let mut dec = Decoder::<Json>::default();
        while cancelled {
            let json: &RawValue = dec.decode(&mut read).await?;
            println!("{json}");
        }

        Ok::<_, io::Error>(())
    });
}

pub trait Decode<'d, T> {
    fn decode(&self, buf: &'d [u8]) -> Result<T, io::Error>;
}

#[derive(Default)]
pub struct Decoder<D> {
    buf: Vec<u8>,
    dec: D,
}

impl<D> Decoder<D> {
    pub async fn decode<'d, T, R>(&'d mut self, read: &mut R) -> Result<T, io::Error>
    where
        D: Decode<'d, T>,
        R: AsyncRead + Unpin,
    {
        let len = read.read_u32_le().await?;
        self.buf.clear();
        self.buf.resize(len as usize, 0);
        read.read_exact(&mut self.buf).await?;
        let val = self.dec.decode(&self.buf)?;
        Ok(val)
    }
}

#[derive(Default)]
struct Json;

impl<'d, T> Decode<'d, T> for Json
where
    T: Deserialize<'d>,
{
    fn decode(&self, buf: &'d [u8]) -> Result<T, io::Error> {
        serde_json::from_slice(buf).map_err(io::Error::from)
    }
}

playground

And when I did this in my actual code, everything broke with an error:

implementation of `Decode` is not general enough

But when I wrote a minimal example for this post, to my surprise everything worked! But this still didn't give me any understanding of how to fix my problem. It looks like my minimal example was missing something from the actual code. As I understand, I also made an associated type in the Decode trait for a custom error type:

pub trait Decode<'d, T> {
    type Error: From<io::Error>;
    fn decode(&self, buf: &'d [u8]) -> Result<T, Self::Error>;
}

And by adding a custom error to the example, I finally got the error:

error: implementation of `Decode` is not general enough
  --> src/lib.rs:12:5
   |
12 | /     tokio::spawn(async move {
13 | |         let mut dec = Decoder::<Json>::default();
14 | |         while cancelled {
15 | |             let json: &RawValue = dec.decode(&mut read).await?;
...  |
19 | |         Ok::<_, JsonError>(())
20 | |     });
   | |______^ implementation of `Decode` is not general enough
   |
   = note: `Json` must implement `Decode<'0, &'1 serde_json::value::RawValue>`, for any two lifetimes `'0` and `'1`...
   = note: ...but it actually implements `Decode<'2, &serde_json::value::RawValue>`, for some specific lifetime `'2`

Full example:

use {
    serde::Deserialize,
    serde_json::value::RawValue,
    std::{io, pin::Pin},
    tokio::io::{AsyncRead, AsyncReadExt},
};

async fn _task() {
    // read bytes from some async reader file/socket/etc
    let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
    let cancelled = true;
    tokio::spawn(async move {
        let mut dec = Decoder::<Json>::default();
        while cancelled {
            let json: &RawValue = dec.decode(&mut read).await?;
            println!("{json}");
        }

        Ok::<_, JsonError>(())
    });
}

pub trait Decode<'d, T> {
    type Error: From<io::Error>;
    fn decode(&self, buf: &'d [u8]) -> Result<T, Self::Error>;
}

#[derive(Default)]
pub struct Decoder<D> {
    buf: Vec<u8>,
    dec: D,
}

impl<D> Decoder<D> {
    pub async fn decode<'d, T, R>(&'d mut self, read: &mut R) -> Result<T, D::Error>
    where
        D: Decode<'d, T>,
        R: AsyncRead + Unpin,
    {
        let len = read.read_u32_le().await?;
        self.buf.clear();
        self.buf.resize(len as usize, 0);
        read.read_exact(&mut self.buf).await?;
        let val = self.dec.decode(&self.buf)?;
        Ok(val)
    }
}

#[derive(Default)]
struct Json;

impl<'d, T> Decode<'d, T> for Json
where
    T: Deserialize<'d>,
{
    type Error = JsonError;

    fn decode(&self, buf: &'d [u8]) -> Result<T, Self::Error> {
        serde_json::from_slice(buf).map_err(JsonError::from)
    }
}

enum JsonError {
    Json(serde_json::Error),
    Io(io::Error),
}

impl From<serde_json::Error> for JsonError {
    fn from(v: serde_json::Error) -> Self {
        Self::Json(v)
    }
}

impl From<io::Error> for JsonError {
    fn from(v: io::Error) -> Self {
        Self::Io(v)
    }
}

playground

Well, while writing this post, more questions came up.
I have no idea what this error is, why does it appear? Why does everything work great in one case, but break down in another? Why can't I use an associated type? Please help me figure it out

1 Like

Strangely enough, if you make a function, it works:

async fn _task() {
    // read bytes from some async reader file/socket/etc
    let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
    let cancelled = true;
    tokio::spawn(async move {
        let mut dec = Decoder::<Json>::default();
        while cancelled {
            let json: &RawValue = do_decode(&mut dec, &mut read).await?;
            println!("{json}");
        }

        Ok::<_, JsonError>(())
    });
}

fn do_decode<'d: 'r, 'r, R>(
    decoder: &'d mut Decoder<Json>,
    read: &'r mut R,
) -> impl Future<Output = Result<&'d RawValue, JsonError>> + 'r
where
    R: AsyncRead + Unpin,
{
    decoder.decode(read)
}

Playground.

This is very weird. It smells like a-normalization-problem-in-the-hidden-future. I suggest you open an issue.

1 Like

Thanks for the advice, I'll try to do something about it.
But I still don’t understand what causes this problem.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.