Hello newbie here.
I have been working with rust on and off for the past year. I did a project where i created an api using hyper. The set up was using #[tokio::main(flavor = "current_thread")] and moro_local
now i am trying to augment our design a bit and i want to use async callbacks to set up handlers for my endpoints
use controller::Controller;
use http_body_util::{BodyExt, Empty, Full};
use hyper::{body, header, Response};
use hyper::{Request, StatusCode};
use std::future::Future;
use std::pin::Pin;
mod endpoints;
#[derive(thiserror::Error, Debug)]
pub enum RouterError {}
type AsyncCallback<'a> = Box<
dyn Fn(
&'a Controller,
Option<body::Incoming>,
) -> Pin<Box<dyn Future<Output = HandlerResult> + 'a>>
+ 'a,
>;
type BoxBody = http_body_util::combinators::BoxBody<Bytes, RouterError>;
type HandlerResult = std::result::Result<Response<BoxBody>, RouterError>;
///
/// Helper to create empty http response
///
fn empty() -> BoxBody {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
///
/// Helper to create http respone with body
///
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
///
/// helper to return 200 with json body
///
fn resp_json_body(js: String) -> Response<BoxBody> {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(full(js))
.unwrap()
}
///
/// helper to return 200 with no body
///
fn resp_ok_no_body() -> Response<BoxBody> {
Response::builder()
.status(StatusCode::OK)
.body(empty())
.unwrap()
}
pub struct Router<'a> {
ctrl: &'a controller::Controller,
cb: AsyncCallback<'a>,
}
async fn test(_ctrl: &Controller, _body: Option<body::Incoming>) -> HandlerResult {
println!("hello veloo");
Ok(resp_ok_no_body())
}
impl<'a> Router<'a> {
pub fn new(ctrl: &'a controller::Controller) -> Self {
Router {
ctrl,
cb: Box::new(|ctrl: &'a Controller, body: Option<body::Incoming>| {
Box::pin(test(ctrl, body))
}),
}
}
pub async fn call(&self) -> HandlerResult {
(self.cb)(self.ctrl, None).await
}
async fn route() {}
}
pub async fn service<'a>(
req: Request<hyper::body::Incoming>,
router: &'a Router<'a>,
) -> HandlerResult {
println!("Request received {:?}", req);
router.call().await
}
the main looks as follows:
use std::process::ExitCode;
use api::service;
use clap::Parser;
use futures_concurrency::future::FutureGroup;
use hyper_util::rt::TokioIo;
use hyper_util::service::TowerToHyperService;
use log::{error, info};
use std::thread;
use tokio;
use tower::ServiceBuilder;
#[derive(Parser, Debug)]
#[command(version, about, long_about=None)]
struct Args {
#[arg(long = "port")]
port: Option<String>,
}
#[derive(thiserror::Error, Debug)]
pub enum ServiceError {
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<ExitCode, ServiceError> {
let args = Args::parse();
let port = match args.port {
Some(port) => port,
None => "80".to_string(),
};
let addr = format!("localhost:{}", port);
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
info!("listening on {}", addr);
let ctrl = controller::Controller {};
let srv = service::Router::new(&ctrl);
// let mut group = FutureGroup::new();
let result = moro_local::async_scope!(|scope| -> Result<ExitCode, ServiceError> {
loop {
let (stream, _) = listener.accept().await?;
let io = hyper_util::rt::TokioIo::new(stream);
scope.spawn(async {
if let Err(err) = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
hyper::service::service_fn(|req| service::service(req, &srv)),
)
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
})
.await;
result
}
The error:
error[E0597]: `srv` does not live long enough
--> service/src/main.rs:49:81
|
41 | let result = moro_local::async_scope!(|scope| -> Result<ExitCode, ServiceError> {
| __________________-
42 | | loop {
43 | | let (stream, _) = listener.accept().await?;
44 | | let io = hyper_util::rt::TokioIo::new(stream);
... |
49 | | hyper::service::service_fn(|req| service::service(req, &srv)),
| | ^^^ borrowed value does not live long enough
... |
56 | | }
57 | | })
| |______- value captured here
...
60 | }
| -
| |
| `srv` dropped here while still borrowed
| borrow might be used here, when `srv` is dropped and runs the destructor for type `Router<'_>`
the problem is the AsyncCallback and i am unable to find a working solution. can anybody help me ?
EDIT: it works using an Arc<>
use bytes::Bytes;
use controller::Controller;
use http_body_util::{BodyExt, Empty, Full};
use hyper::{body, header, Response};
use hyper::{Request, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
mod endpoints;
#[derive(thiserror::Error, Debug)]
pub enum RouterError {}
type AsyncCallback = Box<
dyn Fn(Arc<Controller>, Option<body::Incoming>) -> Pin<Box<dyn Future<Output = HandlerResult>>>,
>;
type BoxBody = http_body_util::combinators::BoxBody<Bytes, RouterError>;
type HandlerResult = std::result::Result<Response<BoxBody>, RouterError>;
///
/// Helper to create empty http response
///
fn empty() -> BoxBody {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
///
/// Helper to create http respone with body
///
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
///
/// helper to return 200 with json body
///
fn resp_json_body(js: String) -> Response<BoxBody> {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(full(js))
.unwrap()
}
///
/// helper to return 200 with no body
///
fn resp_ok_no_body() -> Response<BoxBody> {
Response::builder()
.status(StatusCode::OK)
.body(empty())
.unwrap()
}
pub struct Router {
ctrl: Arc<controller::Controller>,
cb: AsyncCallback,
}
async fn test(_ctrl: Arc<Controller>, _body: Option<body::Incoming>) -> HandlerResult {
println!("hello veloo");
Ok(resp_ok_no_body())
}
impl Router {
pub fn new(ctrl: Arc<controller::Controller>) -> Self {
Router {
ctrl,
cb: Box::new(|ctrl: Arc<Controller>, body: Option<body::Incoming>| {
Box::pin(test(ctrl, body))
}),
}
}
pub async fn call(&self) -> HandlerResult {
(self.cb)(self.ctrl.clone(), None).await
}
async fn route() {}
}
pub async fn service<'a>(req: Request<hyper::body::Incoming>, router: &'a Router) -> HandlerResult {
println!("Request received {:?}", req);
router.call().await
}
i just dont think it should be necessary since the router is created at the beginning of the program and the async scope is terminated before the controller is out of scope.