Fixing deadlock in request-response pattern

I am trying to implement ConnectionManager that allows you to send requests (send_request) asynchronously using some protocol (choice of protocol is not important).
Responses are matched back to the original request using unique request IDs.
(toy implementation is provided below)

More concretely, send_request generates request ID then it inserts it into the multiplexer table with a corresponding sender, sends the request data, and waits for the response.
Responses are queued to response_queue.
ConnectionManager.nextneeds to be called to dequeue responses and send them to the corresponding requests' senders.

In the code below it spawns a task to call ConnectionManager.next every second.
Then it sends a request and wait for the response.

But it has a deadlock issue because the send_request method holds a lock on the ConnectionManager while waiting for a response, and the next also needs the same lock to process responses.

What are the possible solutions?
One solution I came up is to spawn a task to call next periodically in some method in ConnectionManager so that I don't have to use Mutex first of all. But its downside is I cannot control when next is called even if I want.
I would like to know if there are simpler solutions.
Thanks in advance!

use std::{collections::HashMap, sync::Arc};
use tokio::sync::{
    mpsc::{UnboundedReceiver, UnboundedSender},
    Mutex,
};

type ReqId = String;

struct Response {
    req_id: String,
    data: String,
}

struct ConnectionManager {
    multiplexer_table: HashMap<ReqId, UnboundedSender<String>>,
    response_queue: UnboundedReceiver<Response>,
}

impl ConnectionManager {
    fn new(response_queue: UnboundedReceiver<Response>) -> Self {
        Self {
            multiplexer_table: HashMap::new(),
            response_queue,
        }
    }

    async fn next(&mut self) {
        let response = self.response_queue.recv().await.unwrap();
        let _ = self.multiplexer_table
            .get_mut(&response.req_id)
            .unwrap()
            .send(response.data);
    }

    fn gen_uniq_req_id() -> String {
        // generate some uniq id
        // implementation is ommited for brevity
        return String::from("");
    }

    async fn send_data(&self, req_id: &str, data: &str) {
        // send data on the wire
        // implementation is ommited for brevity
    }

    async fn send_request(&mut self, data: &str) -> String {
        let req_id = Self::gen_uniq_req_id();
        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
        self.multiplexer_table.insert(req_id.clone(), tx);
        // FIXME: need to clean up tx at the end of the request
        self.send_data(&req_id, data).await;
        return rx.recv().await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let (response_queue_tx, response_queue_rx) = tokio::sync::mpsc::unbounded_channel::<Response>();
    let mut manager = Arc::new(Mutex::new(ConnectionManager::new(response_queue_rx)));
    let manager_cloned = manager.clone();
    tokio::spawn(async move {
        loop {
            manager_cloned.lock().await.next().await;
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        }
    });
    let response = manager.clone().lock().await.send_request("hello").await;
    print!("{}", response);
}
``

You're calling recv() on an UnboundedReceiver, which will block until there is a message available. You could use try_recv() instead.

1 Like

Your logic is to store the channels in HashMap and choose by id.

However, there's better model.

Let's change ConnectionManager to Executor, and wrap the data with Job.

When new Executor, you create a mpsc::Channel<Job>, and spawn many threads (workers) to recv Job from rx. Then return an Executor containing tx.

In main thread, you create a OneShot<Resp> (oneshot) channel, and put tx into Job { tx: oneshot::Sender<Resp>, data: String }, then send Job to workers by Executor.

When worker finish its job, send the Resp through job.tx, then main thread recv it.

You can also wrap Job with enum like Kind::Do(Job), ``Kind::Shutdown` and so on to control the workers.

This shall remove lock on HashMap in your example, and it's widely used and taught in many Rust tutorials.

1 Like

@hax10
Thank you! I want to keep the signature of send_request method to lessen a burden of the caller. Is this possible by using try_recv?

@kingwingfly
Thanks!
I am assuming you receive responses in "one" channel (response_queue) and you have to look at request ID in each payload to determine which sender channel to send the response.
How does this fit in your model?

The channel for response returning back is sent with Job.

use tokio::sync::mpsc::{channel, Sender, Receiver};
use tokio::sync::oneshot;
use std::thread;

struct Exe {
    tx: Sender<Job>,
}

struct Job {
    tx: oneshot::Sender<Resp>,
    data: String,
}

struct Resp {
    data: String,
}

impl Exe {
    fn new() -> Self {
        let (tx, mut rx) = channel::<Job>(4);
        thread::spawn(move ||{
            tokio::spawn(async move {
                while let Some(job) = rx.recv().await {
                    let Job {tx, data} = job;
                    tx.send(Resp{data}).ok();
                }
            })
        });
        Self { tx }
    }
    
    async fn spawn(&self, data: String) -> Resp {
        let (tx, rx) = oneshot::channel::<Resp>();
        self.tx.send(Job {tx, data});
        rx.await.expect("no!!!")
    }
}

Let met clarify my issue a bit more.
There is a callback function which is called whenever a response message (that contains request ID and response body) is received from a peer. The callback function enqueues messages to a queue.
Each response body needs to be sent to a channel whose request ID matches the message's request ID.

In your code, you assume you know which tx to send a response to without looking at the request ID in a message, but the you need to look at the request ID to decide which tx to send a response to.
And to do so I guess you need to keep track of tx by request_id in a HashMap.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.