Help Needed: Simplifying Async Connection Implementation in Rust
Hello everyone,
I'm working on a Rust project that involves creating a connection type and accompanying components like an endpoint, listener, server, and app types. However I am struggling to design suitable abstractions in async code.
However I am struggling with choosing the right abstractions. Innitially I tried using a RW: AsyncRead + AsyncWrite + Sync + Send
abstractions.
Here is my current implementation of the Connection
struct:
#[pin_project::pin_project]
pub struct Connection<RW: AsyncRead + AsyncWrite + Send + Sync> {
#[pin]
stream: RW,
}
A Listener
is responsible for
/// Listener are created bying calling `Bindable::bind`, they are responsible for listening for and
/// accepting connections.
#[async_trait]
pub trait Listener<RW: AsyncRead + AsyncWrite + Send + Sync + Unpin> {
async fn accept(&self) -> std::result::Result<(Connection<RW>, SocketAddr), Box<dyn std::error::Error + Send + Sync>>;
}
/// constructs Listeners
#[async_trait]
pub trait Bindable {
type Listener: Listener<Self::RW>;
type RW: AsyncRead + AsyncWrite + Send + Sync + Unpin;
async fn bind(&self) -> std::result::Result<(Self::Listener, SocketAddr), Box<dyn std::error::Error + Send + Sync>>;
}
/// Endpoints terminate client connections and hands them off to incoming connection channel.
#[derive(Debug)]
pub struct Endpoint<B: Bindable + Send + Sync> {
address: SocketAddr,
incoming_connection_tx: Sender<Connection<B::RW>>,
bindable: B
}
impl<B: Bindable + Send + Sync> Display for Endpoint<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Endpoint {{ address = {} }}", self.address)
}
}
impl<B: Bindable + Send + Sync> Endpoint<B> {
pub fn new(address: SocketAddr, incoming_connection_tx: Sender<Connection<B::RW>>, bindable: B) -> Self {
Self {
address,
incoming_connection_tx,
bindable
}
}
pub async fn run(&self) {
let (listener, _) = self.bindable.bind().await.unwrap();
loop {
let (conn, _) = listener.accept().await.unwrap();
if self.incoming_connection_tx.send(conn).await.is_err() {
// Trace error
panic!("error sending connection from endpoint")
}
}
}
}
However now I am implementing these interfaces and I am struggling with the Endpoint
-> Listener
-> Connection
interaction:
I implemented a simple impl Listener for TcpConnectionListener
:
struct TcpConnectionListener(TcpListener);
#[async_trait]
impl Listener<TcpStream> for TcpConnectionListener {
async fn accept(
&self,
) -> std::result::Result<
(Connection<TcpStream>, SocketAddr),
Box<dyn std::error::Error + Send + Sync>,
> {
self.0
.accept()
.await
.map(|(stream, addr)| (Connection::from(stream), addr))
.map_err(|e| e.into())
}
}
A impl Bindable for TcpConnectionBuilder
struct TcpConnectionBuilder;
#[async_trait]
impl Bindable for TcpConnectionBuilder {
type Listener = TcpConnectionListener;
type RW = TcpStream;
async fn bind(
&self,
) -> std::result::Result<(Self::Listener, SocketAddr), Box<dyn std::error::Error + Send + Sync>> {
TcpListener::bind(addr)
}
}
The issues I'm facing include:
TcpConnectionListener.accept
returnsResult<Connection<TcpStream>>
and is not generic overTcpStream
, requiring me to declaretype RW = TcpStream
.- Extensive use of
async_trait
and boxing errors. - Passing generics from
Connection<RW>
everywhere.
I noticed that Cloudflare's pingora
codebase used an enum of connection types rather than trying to be generic over all AsyncRead + AsyncWrite + Send + Sync
. However such an implementation requires abstracting many additional types and lots of match statements.
Where should I draw the line between using trait objects, enums, and generics? Can anyone help me design an alternative set of abstractions for connections? I'm suffering from design paralysis.
Thank you in advance!