My code as below:
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use prost::Message;
use tokio::select;
use tonic::{Code, Request, Response, Status};
use crate::interfaces::IModuleMgr;
use crate::{MODULE_MGR};
use crate::proto::test::{ CarTestCallback,};
use crate::proto::test::car_test_service_server::CarTestService;
use crate::modules::test::test_constants::constants;
use crate::interfaces::ITestBusiness;
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
pub struct TestModuleService {
map: Mutex<HashMap<String, u64>>,
}
type ResponseStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
impl TestModuleService {
pub fn new() -> Self {
Self {
map: Mutex::new(HashMap::new()),
}
}
}
async fn callback_calcel_handler<FRequest, FCancellation,T>(
request_future: FRequest,
cancellation_future: FCancellation,
) -> Result<Response<ResponseStream<T>>, Status>
where
FRequest: Future<Output = Result<Response<ResponseStream<T>>, Status>> + Send + 'static,
FCancellation:Future<Output = Result<Response<ResponseStream<T>>, Status>> + Send + 'static,
T: Send + 'static,
{
let token = CancellationToken::new();
// Will call token.cancel() when the future is dropped, such as when the client cancels the request
let _drop_guard = token.clone().drop_guard();
let select_task = tokio::spawn(async move {
// Can select on token cancellation on any cancellable future while handling the request,
// allowing for custom cleanup code or monitoring
select! {
res = request_future => res,
_ = token.cancelled() => cancellation_future.await,
}
});
select_task.await.unwrap()
}
#[tonic::async_trait]
impl CarTestService for TestModuleService {
type registerTestCallbackStream = ResponseStream<CarTestCallback>;
async fn register_test_callback(&self, request: Request<(Empty)>)
-> Result<Response<Self::registerTestCallbackStream>, Status> {
let key = request.remote_addr().unwrap().to_string();
let key_clone = key.clone();
let request_future = async move {
println!("register_door_callback remote_addr {:?}", key);
let (tx, rx) = mpsc::channel(64);
let tx_arc = Arc::new(tx);
//todo ....
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::registerTestCallbackStream
))
};
let cancellation_future = async move {
println!("registered from {:?} cancelled by client", key_clone);
Err(Status::cancelled("Register cancelled by client"))
};
callback_calcel_handler(request_future, cancellation_future).await
}
}
compile error:
error[E0521]: borrowed data escapes outside of method
| --> test_project/src/modules/test/test_module_service.rs:265:9
| |
| 201 | async fn register_test_callback(&self, request: Request<(Empty)>)
| | -----
| | |
| | `self` is a reference that is only valid in the method body
| | lifetime `'life0` defined here
| ...
| 265 | test_callback_calcel_handler(request_future, cancellation_future).await
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| | |
| | `self` escapes the method body here
| | argument requires that `'life0` must outlive `'static`
What's wrong with my code? Why is the lifetimes of self must be 'static