General advice: Actors-based connections

I am trying to write a reverse proxy as a toy project to learn more about async/await in Rust. I wound up deciding to use Actix Actors for most of my work, but confused about some pretty general things (I also did this for a hackathon, so my decision-making was not at its finest).

For example, I want a top-level Actor that, when it starts, will bind a tokio::net::TcpListener to a port, and on each incoming connection will pull an HTTP request out of the port. From there, it would send the request to an AppAgent for processing.

Here's my code:

impl StreamHandler<Option<(TcpStream, SocketAddr)>> for HttpActor {
    fn handle(&mut self, item: Option<(TcpStream, SocketAddr)>, ctx: &mut Self::Context) {
        match item {
            None => ctx.notify(HttpActorShutdown::PollAcceptError),
            Some((stream, addr)) => {
                let ctx_ptr = Rc::new(RefCell::new(*ctx));
                ctx.add_stream(futures_util::stream::once( /* ??? */  ));
            }
        }
    }
}

pub type IoResult<T> = Result<T, std::io::Error>;
impl StreamHandler<IoResult<TcpListener>> for HttpActor {
    fn handle(&mut self, item: IoResult<TcpListener>, ctx: &mut Self::Context) {
        if let std::result::Result::Ok(listener) = item {
            ctx.add_stream(futures_util::stream::poll_fn(move |ctx| {
                match listener.poll_accept(ctx) {
                    Poll::Ready(result) => match result {
                        std::result::Result::Ok(incoming) => Poll::Ready(Some(Some(incoming))),
                        Err(e) => Poll::Ready(Some(None)),
                    },
                    _ => Poll::Pending,
                }
            }));
        }
    }
}

impl Actor for HttpActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        ctx.add_stream(futures_util::stream::once(TcpListener::bind((
            Ipv4Addr::LOCALHOST,
            self.config.http_port.unwrap_or(80),
        ))));
    }
}

The two biggest concerns I have are:

  • In the IoResult<TcpListener> handler, it seems that I can't get the actor to repeatedly call for accepted connections without a Clone closure, but the context is obviously not going to be cloneable.

  • In the Option<(TcpStream, SocketAddr)> handler, I spent a lot of time trying to make an Executor that could drive a hyper::serve_connection_with_upgrades, but again it appears that the context being behind a references prevents this.

How do I manage these kind of persistent network resources effectively in an Actor model? Is this just the completely wrong way to go about it or are there "local" changes I could make to make this work?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.