Async callback implementation

Hello newbie here.
I have been working with rust on and off for the past year. I did a project where i created an api using hyper. The set up was using #[tokio::main(flavor = "current_thread")] and moro_local

now i am trying to augment our design a bit and i want to use async callbacks to set up handlers for my endpoints

use controller::Controller;
use http_body_util::{BodyExt, Empty, Full};
use hyper::{body, header, Response};
use hyper::{Request, StatusCode};
use std::future::Future;
use std::pin::Pin;

mod endpoints;

#[derive(thiserror::Error, Debug)]
pub enum RouterError {}

type AsyncCallback<'a> = Box<
    dyn Fn(
            &'a Controller,
            Option<body::Incoming>,
        ) -> Pin<Box<dyn Future<Output = HandlerResult> + 'a>>
        + 'a,
>;

type BoxBody = http_body_util::combinators::BoxBody<Bytes, RouterError>;

type HandlerResult = std::result::Result<Response<BoxBody>, RouterError>;

///
/// Helper to create empty http response
///
fn empty() -> BoxBody {
    Empty::<Bytes>::new()
        .map_err(|never| match never {})
        .boxed()
}
///
/// Helper to create http respone with body
///
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody {
    Full::new(chunk.into())
        .map_err(|never| match never {})
        .boxed()
}

///
/// helper to return 200 with json body
///
fn resp_json_body(js: String) -> Response<BoxBody> {
    Response::builder()
        .status(StatusCode::OK)
        .header(header::CONTENT_TYPE, "application/json")
        .body(full(js))
        .unwrap()
}

///
/// helper to return 200 with no body
///
fn resp_ok_no_body() -> Response<BoxBody> {
    Response::builder()
        .status(StatusCode::OK)
        .body(empty())
        .unwrap()
}

pub struct Router<'a> {
    ctrl: &'a controller::Controller,
    cb: AsyncCallback<'a>,
}

async fn test(_ctrl: &Controller, _body: Option<body::Incoming>) -> HandlerResult {
    println!("hello veloo");
    Ok(resp_ok_no_body())
}

impl<'a> Router<'a> {
    pub fn new(ctrl: &'a controller::Controller) -> Self {
        Router {
            ctrl,
            cb: Box::new(|ctrl: &'a Controller, body: Option<body::Incoming>| {
                Box::pin(test(ctrl, body))
            }),
        }
    }

    pub async fn call(&self) -> HandlerResult {
        (self.cb)(self.ctrl, None).await
    }

    async fn route() {}
}

pub async fn service<'a>(
    req: Request<hyper::body::Incoming>,
    router: &'a Router<'a>,
) -> HandlerResult {
    println!("Request received {:?}", req);
    router.call().await
}

the main looks as follows:

use std::process::ExitCode;

use api::service;
use clap::Parser;
use futures_concurrency::future::FutureGroup;
use hyper_util::rt::TokioIo;
use hyper_util::service::TowerToHyperService;
use log::{error, info};
use std::thread;
use tokio;
use tower::ServiceBuilder;

#[derive(Parser, Debug)]
#[command(version, about, long_about=None)]
struct Args {
    #[arg(long = "port")]
    port: Option<String>,
}

#[derive(thiserror::Error, Debug)]
pub enum ServiceError {
    #[error("Io error: {0}")]
    Io(#[from] std::io::Error),
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<ExitCode, ServiceError> {
    let args = Args::parse();
    let port = match args.port {
        Some(port) => port,
        None => "80".to_string(),
    };
    let addr = format!("localhost:{}", port);
    let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
    info!("listening on {}", addr);
    let ctrl = controller::Controller {};
    let srv = service::Router::new(&ctrl);

    //    let mut group = FutureGroup::new();

    let result = moro_local::async_scope!(|scope| -> Result<ExitCode, ServiceError> {
        loop {
            let (stream, _) = listener.accept().await?;
            let io = hyper_util::rt::TokioIo::new(stream);
            scope.spawn(async {
                if let Err(err) = hyper::server::conn::http1::Builder::new()
                    .serve_connection(
                        io,
                        hyper::service::service_fn(|req| service::service(req, &srv)),
                    )
                    .await
                {
                    println!("Error serving connection: {:?}", err);
                }
            });
        }
    })
    .await;
    result
}

The error:

error[E0597]: `srv` does not live long enough
  --> service/src/main.rs:49:81
   |
41 |       let result = moro_local::async_scope!(|scope| -> Result<ExitCode, ServiceError> {
   |  __________________-
42 | |         loop {
43 | |             let (stream, _) = listener.accept().await?;
44 | |             let io = hyper_util::rt::TokioIo::new(stream);
...  |
49 | |                         hyper::service::service_fn(|req| service::service(req, &srv)),
   | |                                                                                 ^^^ borrowed value does not live long enough
...  |
56 | |         }
57 | |     })
   | |______- value captured here
...
60 |   }
   |   -
   |   |
   |   `srv` dropped here while still borrowed
   |   borrow might be used here, when `srv` is dropped and runs the destructor for type `Router<'_>`

the problem is the AsyncCallback and i am unable to find a working solution. can anybody help me ?

EDIT: it works using an Arc<>

use bytes::Bytes;
use controller::Controller;
use http_body_util::{BodyExt, Empty, Full};
use hyper::{body, header, Response};
use hyper::{Request, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

mod endpoints;

#[derive(thiserror::Error, Debug)]
pub enum RouterError {}

type AsyncCallback = Box<
    dyn Fn(Arc<Controller>, Option<body::Incoming>) -> Pin<Box<dyn Future<Output = HandlerResult>>>,
>;

type BoxBody = http_body_util::combinators::BoxBody<Bytes, RouterError>;

type HandlerResult = std::result::Result<Response<BoxBody>, RouterError>;

///
/// Helper to create empty http response
///
fn empty() -> BoxBody {
    Empty::<Bytes>::new()
        .map_err(|never| match never {})
        .boxed()
}
///
/// Helper to create http respone with body
///
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody {
    Full::new(chunk.into())
        .map_err(|never| match never {})
        .boxed()
}

///
/// helper to return 200 with json body
///
fn resp_json_body(js: String) -> Response<BoxBody> {
    Response::builder()
        .status(StatusCode::OK)
        .header(header::CONTENT_TYPE, "application/json")
        .body(full(js))
        .unwrap()
}

///
/// helper to return 200 with no body
///
fn resp_ok_no_body() -> Response<BoxBody> {
    Response::builder()
        .status(StatusCode::OK)
        .body(empty())
        .unwrap()
}

pub struct Router {
    ctrl: Arc<controller::Controller>,
    cb: AsyncCallback,
}

async fn test(_ctrl: Arc<Controller>, _body: Option<body::Incoming>) -> HandlerResult {
    println!("hello veloo");
    Ok(resp_ok_no_body())
}

impl Router {
    pub fn new(ctrl: Arc<controller::Controller>) -> Self {
        Router {
            ctrl,
            cb: Box::new(|ctrl: Arc<Controller>, body: Option<body::Incoming>| {
                Box::pin(test(ctrl, body))
            }),
        }
    }

    pub async fn call(&self) -> HandlerResult {
        (self.cb)(self.ctrl.clone(), None).await
    }

    async fn route() {}
}

pub async fn service<'a>(req: Request<hyper::body::Incoming>, router: &'a Router) -> HandlerResult {
    println!("Request received {:?}", req);
    router.call().await
}

i just dont think it should be necessary since the router is created at the beginning of the program and the async scope is terminated before the controller is out of scope.

Your AsyncCallback is saying that the callback takes one lifetime which is the same for all calls as when it was constructed, and which is invariant (can be neither lengthened nor shortened). I don't follow the whole problem, but this is usually not a good idea. Try this instead:

type AsyncCallback = Box<
    dyn for<'a> Fn(
            &'a Controller,
            Option<body::Incoming>,
        ) -> Pin<Box<dyn Future<Output = HandlerResult> + 'a>>
>;

This way, the callback function can take a different lifetime of borrow each time it is called. That is usually a better idea.

2 Likes

Wow! Thank you <3 it works. I don't really understand it but i guess it has something to do with HRTB. I will dig more into that thank you.

Here is a simpler, non-async program with may help illustrate the problem:

type FnOneLt<'a> = Box<dyn Fn(&'a str) -> &'a str>;
type FnAnyLt = Box<dyn for<'b> Fn(&'b str) -> &'b str>;
// equivalent to: Box<dyn Fn(&str) -> &str>

fn a_function<'c>(s: &'c str) -> &'c str {
    &s[s.len() - 5..]
}

fn example(f: FnAnyLt) { // Try putting `FnOneLt` here. What lifetime should it have?
    let mut s = String::from("hello");
    print!("{}", f(&s)); // This borrow's lifetime ...
    s.push_str(" world");
    println!(" {}", f(&s)); // ...is necessarily different from, and disjoint from, this one
}

fn main() {
    example(Box::new(a_function));
}

I also gave each introduced lifetime a different name, to be clear that none of them are intrinsically the same as each other.

1 Like

Thanks for you example. If i might i have another example to look at:

type AsyncCallback = Box<
    dyn for<'a> Fn(
        &'a Controller,
        Option<body::Incoming>,
    ) -> Pin<Box<dyn Future<Output = HandlerResult> + 'a>>,
>;
type AsyncCallbackT<'a> = Box<dyn Fn(&'a str) -> Pin<Box<dyn Future<Output = ()> + 'a>> + 'a>;

pub struct Router<'a> {
    ctrl: &'a controller::Controller,
    cb: AsyncCallback,
    cbT: AsyncCallbackT<'a>,
}

async fn test(_ctrl: &Controller, _body: Option<body::Incoming>) -> HandlerResult {
    println!("hello veloo");
    Ok(resp_ok_no_body())
}

async fn testT(txt: &str) {
    println!("hello {txt}");
}

impl<'a> Router<'a> {
    pub fn new(ctrl: &'a controller::Controller) -> Self {
        Router {
            ctrl,
            cb: Box::new(|ctrl: &Controller, body: Option<body::Incoming>| {
                Box::pin(test(ctrl, body))
            }),
            cbT: Box::new(|txt: &str| Box::pin(testT(txt))),
        }
    }

    pub async fn call(&self) -> HandlerResult {
        (self.cb)(self.ctrl, None).await
    }
    pub async fn callT(&self, txt: &'a str) {
        (self.cbT)(txt).await
    }

    async fn route() {}
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let args = Args::parse();
    let port = match args.port {
        Some(port) => port,
        None => "80".to_string(),
    };
    let addr = format!("localhost:{}", port);
    let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
    info!("listening on {}", addr);
    let ctrl = controller::Controller {};
    let srv = service::Router::new(&ctrl);
    srv.callT("hallo").await;
    srv.callT("velo").await;

}

if i use it like this with the cbT then there is no lifetime problem. So i tried test this on the example you provided:

use std::future::Future;
use std::pin::Pin;
type FnOneLt<'a> = Box<dyn Fn(&'a str) -> ()>;
type FnAnyLt = Box<dyn for<'b> Fn(&'b str) -> &'b str>;
// equivalent to: Box<dyn Fn(&str) -> &str>

type AsyncCallback<'a> = Box<dyn Fn(&'a str) -> Pin<Box<dyn Future<Output = ()> + 'a>> + 'a>;

fn a_function<'c>(s: &'c str) {
    // &s[s.len() - 5..]
    println!("{s}");
}

async fn b_function<'a>(s: &'a str) {
    println!("{s}");
}

fn example(f: AsyncCallback, s: String) {
    // Try putting `FnOneLt` here. What lifetime should it have?
    // print!("{:?}", f(&s)); // This borrow's lifetime ...
    f(&s);
    // s.push_str(" world");
    // println!(" {}", f(&s)); // ...is necessarily different from, and disjoint from, this one
}

fn main() {
    let s = String::from("hello");
    let fx: AsyncCallback = Box::new(|c: &str| Box::pin(b_function(c)));
    example(fx, s);
}

which does tell me that s does not live long enough and i dont understand the difference here.

i found what the difference is:

use std::future::Future;
use std::pin::Pin;
type FnOneLt<'a> = Box<dyn Fn(&'a str) -> ()>;
type FnAnyLt = Box<dyn for<'b> Fn(&'b str) -> &'b str>;
// equivalent to: Box<dyn Fn(&str) -> &str>

type AsyncCallback<'a> = Box<dyn Fn(&'a str) -> Pin<Box<dyn Future<Output = ()> + 'a>> + 'a>;

fn a_function<'c>(s: &'c str) {
    // &s[s.len() - 5..]
    println!("{s}");
}

async fn b_function<'a>(s: &'a str) {
    println!("{s}");
}

async fn example<'a>(f: AsyncCallback<'a>, s: &'a str) {
    // Try putting `FnOneLt` here. What lifetime should it have?
    // print!("{:?}", f(&s)); // This borrow's lifetime ...
    f(s).await;
    // s.push_str(" world");
    // println!(" {}", f(&s)); // ...is necessarily different from, and disjoint from, this one
}
#[tokio::main]
async fn main() {
    let s = String::from("hello");
    let fx: AsyncCallback = Box::new(|c: &str| Box::pin(b_function(c)));
    example(fx, "asdf").await;
}

if instead of passing a String i pass a &str which is already a reference and created outside of the scope of example then it satisfies the lifetime requriements. Thats why for me it worked in the first place.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.