Trait `AsyncBufRead` is not implemented for `Arc<BufReader<OwnedReadHalf>>`

Hi everyone !

In my trait for servers I faced with issue related to AsyncBufRead. The trait code:

#[async_trait]
pub trait BaseServer: Send {
    async fn start(port: u16) -> anyhow::Result<()> {
        let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await?;

        let mut connections: Vec<Arc<OwnedWriteHalf>> = vec![];

        loop {
            tokio::select! {
                Ok((socket, addr)) = listener.accept() => {
                    let (input_sender, input_receiver) = mpsc::channel::<Packet>(BUFFER_SIZE);
                    let (output_sender, output_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
                    let (rx, tx) = socket.into_split();

                    let reader = Arc::new(BufReader::new(rx));
                    let writer = Arc::new(tx);

                    let connection = Arc::new(Connection::default());
                    connections.push(writer.clone());

                    tokio::spawn(async move {
                        join_all(vec![
                            Self::handle_read(input_sender, reader, connection.clone()),
                            Self::handle_input(input_receiver, output_sender, connection.clone()),
                            Self::handle_write(output_receiver, writer, connection.clone()),
                        ]).await;
                    });
                },
                _ = signal::ctrl_c() => {
                    for mut writer in connections.into_iter() {
                        writer.shutdown().await?;
                    }

                    break;
                },
            }
        }

        Ok(())
    }

    async fn handle_read<T>(
        input_sender: Sender<T>,
        reader: Arc<BufReader<OwnedReadHalf>>,
        connection: Arc<Connection>,
    ) -> JoinHandle<anyhow::Result<()>>;

    async fn handle_input<T>(
        mut input_receiver: Receiver<T>,
        output_sender: Sender<Vec<u8>>,
        connection: Arc<Connection>,
    ) -> JoinHandle<anyhow::Result<()>>;

    async fn handle_write(
        mut output_receiver: Receiver<Vec<u8>>,
        writer: Arc<OwnedWriteHalf>,
        connection: Arc<Connection>,
    ) -> JoinHandle<anyhow::Result<()>>;

    // ...
}

I face with the issue when implement the handle_read method:

#[async_trait]
impl BaseServer for LoginServer {
    async fn handle_read<T>(
        input_sender: Sender<T>,
        mut reader: Arc<BufReader<OwnedReadHalf>>,
        _: Arc<Connection>,
    ) -> JoinHandle<anyhow::Result<()>> {
        tokio::spawn(async move {
            loop {
                let opcode = reader.read_u8().await?;

                let data = match opcode {
                    Opcode::LOGIN_CHALLENGE => {
                        LoginChallengeIncoming::from_stream(&mut reader).await?
                    }
                    Opcode::LOGIN_PROOF => LoginProofIncoming::from_stream(&mut reader).await?,
                    Opcode::REALM_LIST => RealmlistIncoming::from_stream(&mut reader).await?,
                    _ => vec![],
                };

                if !data.is_empty() {
                    input_sender.send(data).await?;
                }
            }
        })
    }

    // ...
}

and the error is:

Trait `AsyncBufRead` is not implemented for `Arc<BufReader<OwnedReadHalf>>`

for example, it happens at this line:

LoginChallengeIncoming::from_stream(&mut reader).await?

Could somebody explain why this happen and how to fix ?

UPDATED: for example, if I add this line let mut reader = BufReader::new(reader.get_mut()); this will fix the issue, but it seems to be a dirty workaround.

You need mutable access to read from a BufReader, but an Arc only provides immutable access to its contents.

Why is it inside an Arc? The fix is almost certainly to not store it in an Arc.

1 Like

Thank you for the explanation ! It was misunderstanding from my side.