I am trying to implement ConnectionManager
that allows you to send requests (send_request
) asynchronously using some protocol (choice of protocol is not important).
Responses are matched back to the original request using unique request IDs.
(toy implementation is provided below)
More concretely, send_request
generates request ID then it inserts it into the multiplexer table with a corresponding sender, sends the request data, and waits for the response.
Responses are queued to response_queue
.
ConnectionManager.next
needs to be called to dequeue responses and send them to the corresponding requests' senders.
In the code below it spawns a task to call ConnectionManager.next
every second.
Then it sends a request and wait for the response.
But it has a deadlock issue because the send_request
method holds a lock on the ConnectionManager
while waiting for a response, and the next
also needs the same lock to process responses.
What are the possible solutions?
One solution I came up is to spawn a task to call next
periodically in some method in ConnectionManager so that I don't have to use Mutex first of all. But its downside is I cannot control when next
is called even if I want.
I would like to know if there are simpler solutions.
Thanks in advance!
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex,
};
type ReqId = String;
struct Response {
req_id: String,
data: String,
}
struct ConnectionManager {
multiplexer_table: HashMap<ReqId, UnboundedSender<String>>,
response_queue: UnboundedReceiver<Response>,
}
impl ConnectionManager {
fn new(response_queue: UnboundedReceiver<Response>) -> Self {
Self {
multiplexer_table: HashMap::new(),
response_queue,
}
}
async fn next(&mut self) {
let response = self.response_queue.recv().await.unwrap();
let _ = self.multiplexer_table
.get_mut(&response.req_id)
.unwrap()
.send(response.data);
}
fn gen_uniq_req_id() -> String {
// generate some uniq id
// implementation is ommited for brevity
return String::from("");
}
async fn send_data(&self, req_id: &str, data: &str) {
// send data on the wire
// implementation is ommited for brevity
}
async fn send_request(&mut self, data: &str) -> String {
let req_id = Self::gen_uniq_req_id();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
self.multiplexer_table.insert(req_id.clone(), tx);
// FIXME: need to clean up tx at the end of the request
self.send_data(&req_id, data).await;
return rx.recv().await.unwrap();
}
}
#[tokio::main]
async fn main() {
let (response_queue_tx, response_queue_rx) = tokio::sync::mpsc::unbounded_channel::<Response>();
let mut manager = Arc::new(Mutex::new(ConnectionManager::new(response_queue_rx)));
let manager_cloned = manager.clone();
tokio::spawn(async move {
loop {
manager_cloned.lock().await.next().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
let response = manager.clone().lock().await.send_request("hello").await;
print!("{}", response);
}
``