Hi everyone !
In my trait for servers I faced with issue related to AsyncBufRead
. The trait code:
#[async_trait]
pub trait BaseServer: Send {
async fn start(port: u16) -> anyhow::Result<()> {
let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await?;
let mut connections: Vec<Arc<OwnedWriteHalf>> = vec![];
loop {
tokio::select! {
Ok((socket, addr)) = listener.accept() => {
let (input_sender, input_receiver) = mpsc::channel::<Packet>(BUFFER_SIZE);
let (output_sender, output_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
let (rx, tx) = socket.into_split();
let reader = Arc::new(BufReader::new(rx));
let writer = Arc::new(tx);
let connection = Arc::new(Connection::default());
connections.push(writer.clone());
tokio::spawn(async move {
join_all(vec![
Self::handle_read(input_sender, reader, connection.clone()),
Self::handle_input(input_receiver, output_sender, connection.clone()),
Self::handle_write(output_receiver, writer, connection.clone()),
]).await;
});
},
_ = signal::ctrl_c() => {
for mut writer in connections.into_iter() {
writer.shutdown().await?;
}
break;
},
}
}
Ok(())
}
async fn handle_read<T>(
input_sender: Sender<T>,
reader: Arc<BufReader<OwnedReadHalf>>,
connection: Arc<Connection>,
) -> JoinHandle<anyhow::Result<()>>;
async fn handle_input<T>(
mut input_receiver: Receiver<T>,
output_sender: Sender<Vec<u8>>,
connection: Arc<Connection>,
) -> JoinHandle<anyhow::Result<()>>;
async fn handle_write(
mut output_receiver: Receiver<Vec<u8>>,
writer: Arc<OwnedWriteHalf>,
connection: Arc<Connection>,
) -> JoinHandle<anyhow::Result<()>>;
// ...
}
I face with the issue when implement the handle_read
method:
#[async_trait]
impl BaseServer for LoginServer {
async fn handle_read<T>(
input_sender: Sender<T>,
mut reader: Arc<BufReader<OwnedReadHalf>>,
_: Arc<Connection>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
loop {
let opcode = reader.read_u8().await?;
let data = match opcode {
Opcode::LOGIN_CHALLENGE => {
LoginChallengeIncoming::from_stream(&mut reader).await?
}
Opcode::LOGIN_PROOF => LoginProofIncoming::from_stream(&mut reader).await?,
Opcode::REALM_LIST => RealmlistIncoming::from_stream(&mut reader).await?,
_ => vec![],
};
if !data.is_empty() {
input_sender.send(data).await?;
}
}
})
}
// ...
}
and the error is:
Trait `AsyncBufRead` is not implemented for `Arc<BufReader<OwnedReadHalf>>`
for example, it happens at this line:
LoginChallengeIncoming::from_stream(&mut reader).await?
Could somebody explain why this happen and how to fix ?
UPDATED: for example, if I add this line let mut reader = BufReader::new(reader.get_mut());
this will fix the issue, but it seems to be a dirty workaround.