I want to asynchronously read a stream of bytes separated by u32
s indicating the size of one "packet", then read the packet and deserialize from specific format.
For example, a code that reads json in a loop:
use {
serde::Deserialize,
serde_json::value::RawValue,
std::{io, pin::Pin},
tokio::io::{AsyncRead, AsyncReadExt},
};
async fn _task() {
// read bytes from some async reader file/socket/etc
let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
let non_cancelled = true;
tokio::spawn(async move {
let mut dec = JsonDecoder::default();
while non_cancelled {
let json: &RawValue = dec.decode(&mut read).await?;
println!("{json}");
}
Ok::<_, io::Error>(())
});
}
#[derive(Default)]
pub struct JsonDecoder {
buf: Vec<u8>,
}
impl JsonDecoder {
pub async fn decode<'d, T, R>(&'d mut self, read: &mut R) -> Result<T, io::Error>
where
T: Deserialize<'d>,
R: AsyncRead + Unpin,
{
let len = read.read_u32_le().await?;
self.buf.clear();
self.buf.resize(len as usize, 0);
read.read_exact(&mut self.buf).await?;
let val = serde_json::from_slice(&self.buf)?;
Ok(val)
}
}
I was surprised that this code worked and did not complain about lifetimes, despite the fact that I ran it in a multi-threaded runtime. I love this T: Deserialize<'d>
which allows me to get rid of unnecessary allocations and copying.
But this decoder only works with json. I decided to generalize the decoder to be able to support other formats, for example bincode (also I didn’t want to use serde, but use native bincode's Encode/Decode
traits).
I rewrote the decoder to use a generic implementation:
use {
serde::Deserialize,
serde_json::value::RawValue,
std::{io, pin::Pin},
tokio::io::{AsyncRead, AsyncReadExt},
};
async fn _task() {
// read bytes from some async reader file/socket/etc
let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
let cancelled = true;
tokio::spawn(async move {
let mut dec = Decoder::<Json>::default();
while cancelled {
let json: &RawValue = dec.decode(&mut read).await?;
println!("{json}");
}
Ok::<_, io::Error>(())
});
}
pub trait Decode<'d, T> {
fn decode(&self, buf: &'d [u8]) -> Result<T, io::Error>;
}
#[derive(Default)]
pub struct Decoder<D> {
buf: Vec<u8>,
dec: D,
}
impl<D> Decoder<D> {
pub async fn decode<'d, T, R>(&'d mut self, read: &mut R) -> Result<T, io::Error>
where
D: Decode<'d, T>,
R: AsyncRead + Unpin,
{
let len = read.read_u32_le().await?;
self.buf.clear();
self.buf.resize(len as usize, 0);
read.read_exact(&mut self.buf).await?;
let val = self.dec.decode(&self.buf)?;
Ok(val)
}
}
#[derive(Default)]
struct Json;
impl<'d, T> Decode<'d, T> for Json
where
T: Deserialize<'d>,
{
fn decode(&self, buf: &'d [u8]) -> Result<T, io::Error> {
serde_json::from_slice(buf).map_err(io::Error::from)
}
}
And when I did this in my actual code, everything broke with an error:
implementation of `Decode` is not general enough
But when I wrote a minimal example for this post, to my surprise everything worked! But this still didn't give me any understanding of how to fix my problem. It looks like my minimal example was missing something from the actual code. As I understand, I also made an associated type in the Decode
trait for a custom error type:
pub trait Decode<'d, T> {
type Error: From<io::Error>;
fn decode(&self, buf: &'d [u8]) -> Result<T, Self::Error>;
}
And by adding a custom error to the example, I finally got the error:
error: implementation of `Decode` is not general enough
--> src/lib.rs:12:5
|
12 | / tokio::spawn(async move {
13 | | let mut dec = Decoder::<Json>::default();
14 | | while cancelled {
15 | | let json: &RawValue = dec.decode(&mut read).await?;
... |
19 | | Ok::<_, JsonError>(())
20 | | });
| |______^ implementation of `Decode` is not general enough
|
= note: `Json` must implement `Decode<'0, &'1 serde_json::value::RawValue>`, for any two lifetimes `'0` and `'1`...
= note: ...but it actually implements `Decode<'2, &serde_json::value::RawValue>`, for some specific lifetime `'2`
Full example:
use {
serde::Deserialize,
serde_json::value::RawValue,
std::{io, pin::Pin},
tokio::io::{AsyncRead, AsyncReadExt},
};
async fn _task() {
// read bytes from some async reader file/socket/etc
let mut read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(&[][..]);
let cancelled = true;
tokio::spawn(async move {
let mut dec = Decoder::<Json>::default();
while cancelled {
let json: &RawValue = dec.decode(&mut read).await?;
println!("{json}");
}
Ok::<_, JsonError>(())
});
}
pub trait Decode<'d, T> {
type Error: From<io::Error>;
fn decode(&self, buf: &'d [u8]) -> Result<T, Self::Error>;
}
#[derive(Default)]
pub struct Decoder<D> {
buf: Vec<u8>,
dec: D,
}
impl<D> Decoder<D> {
pub async fn decode<'d, T, R>(&'d mut self, read: &mut R) -> Result<T, D::Error>
where
D: Decode<'d, T>,
R: AsyncRead + Unpin,
{
let len = read.read_u32_le().await?;
self.buf.clear();
self.buf.resize(len as usize, 0);
read.read_exact(&mut self.buf).await?;
let val = self.dec.decode(&self.buf)?;
Ok(val)
}
}
#[derive(Default)]
struct Json;
impl<'d, T> Decode<'d, T> for Json
where
T: Deserialize<'d>,
{
type Error = JsonError;
fn decode(&self, buf: &'d [u8]) -> Result<T, Self::Error> {
serde_json::from_slice(buf).map_err(JsonError::from)
}
}
enum JsonError {
Json(serde_json::Error),
Io(io::Error),
}
impl From<serde_json::Error> for JsonError {
fn from(v: serde_json::Error) -> Self {
Self::Json(v)
}
}
impl From<io::Error> for JsonError {
fn from(v: io::Error) -> Self {
Self::Io(v)
}
}
Well, while writing this post, more questions came up.
I have no idea what this error is, why does it appear? Why does everything work great in one case, but break down in another? Why can't I use an associated type? Please help me figure it out