Deadlock using Thrussh to build server

I am trying to build a ssh server using the thrussh crate that receives commands from clients, forwards them to a world thread and that world thread can send replies to the clients (players). I hit a problem (ssh server thread deadlocking somewhere) that I am unsure how to approach. I tried to keep the explanation short and hopefully clear, otherwise I am happy to reformulate.

To achieve communication from the ssh server to the world thread I open a command and a data channel as mspc::channel with the tx side of both wrapped in an Arc and a Mutex stored in the server struct and the rx side moved into the world thread. This seems to work fine.

To send data to clients of the server, one needs to use handle.data() function. To enable the world thread to send data, I thought I will send the ClientId and the handle to the session in the channel_open_session() function - which initiates the ssh server session - over the command channel to the world thread that in turn then uses this handle to send messages through handle.data() (arbitrary text, eg. describing the world) to the ssh user. However, I run into deadlocks, especially when the server::data() function sends information over the data channel, the world thread receives this and sends some text to the user using handle.data() function. Also, sometimes handle.data() panics. I could establish, that the world thread does not deadlock, it continues running as expected. I assume the deadlock happens somewhere in the thrussh server thread, but I am not sure where. The deadlock can be observer such that entering something on the client side does not lead to any reaction on the server side.

The deadlock (and the panic) does not always occur. I tried to compile a minimal example that leads to the deadlock. I am aware that this example is not working with multiple clients etc…., but I decided to do it like this to avoid added code complexity that would result from registering different clients. I tested the code with a single server and a single connection to it. Happy to provide a screen recording or give access to a git repository if needed.

Any hint on where to focus my debugging efforts is greatly welcome. Also maybe I go about this the wrong way and there is a better way to allow data to be sent from thrussh to another thread and from another independent thread to thrussh users?

The server module (infrastructure::ssh_server):

extern crate thrussh;
extern crate futures;
extern crate tokio;

use std::sync::{Mutex, Arc};
use thrussh::*;
use thrussh::server::{Auth, Session};
use anyhow;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};

#[derive(Clone)]
pub struct Server {
    id: usize,
    tx_data_channel: Arc<Mutex<Sender<String>>>,
    tx_command_channel: Arc<Mutex<Sender<(ChannelId, thrussh::server::Handle)>>>, 
}

impl server::Server for Server {
    type Handler = Self;
    fn new(&mut self, _: Option<std::net::SocketAddr>) -> Self {
        let s = self.clone();
        self.id += 1;
        s
    }
}

impl server::Handler for Server {
    type Error = anyhow::Error;
    type FutureAuth = futures::future::Ready<Result<(Self, server::Auth), anyhow::Error>>;
    type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
    type FutureBool = futures::future::Ready<Result<(Self, Session, bool), anyhow::Error>>;

    fn finished_auth(self, auth: Auth) -> Self::FutureAuth { futures::future::ready(Ok((self, auth))) }
    fn finished_bool(self, b: bool, s: Session) -> Self::FutureBool { futures::future::ready(Ok((self, s, b))) }
    fn finished(self, s: Session) -> Self::FutureUnit {
        futures::future::ready(Ok((self, s)))
    }

    fn auth_password(self, _user: &str, password: &str) -> Self::FutureAuth {
        if password == "Test" {
            futures::future::ready(Ok((self, server::Auth::Accept)))
        } else {
            futures::future::ready(Ok((self, server::Auth::Reject)))
        }
    }

    //noinspection ALL
    fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {

        // Register client with the world - pass the handle to world thread
        //
        // This needs to be done to enable the world thread to send data to the
        // ssh user (eg. a description or a result).
        let handle = session.handle();
        futures::executor::block_on( async {
            let tx = self.tx_command_channel.lock().unwrap();
            if let Err(_) = tx.send((channel, handle.clone())).await {
                //error!("channel_open_session(): receiver dropped");
            } else {
                //info!("channel_open_session(): Sent client id and handle to world.")
            };
        });

        // Display a welcome message
        session.data(channel,CryptoVec::from_slice("Welcome.\r\n".as_ref()));
        futures::future::ready(Ok((self, session)))
    }


    fn data(self, _channel: ChannelId, data: &[u8], session: server::Session) -> Self::FutureUnit { 
        if data.as_ref() == "\u{000d}".as_bytes() {
            // Send data to world instance on CR 
            futures::executor::block_on( async {
                let tx = self.tx_data_channel.lock().unwrap();
                if let Err(_) = tx.send("Test".to_string()).await { 
                    println!("data(): receiver dropped");
                };
            });
        }
        futures::future::ready(Ok((self, session)))
    }
}

pub fn init_ssh_server() -> (Server, Arc<thrussh::server::Config>,
                             Receiver<String>, Receiver<(ChannelId, thrussh::server::Handle)>) {
    // Configure the server
    let mut config = thrussh::server::Config::default();
    config.connection_timeout = Some(std::time::Duration::from_secs(600));
    config.auth_rejection_time = std::time::Duration::from_secs(3);
    config.keys.push(thrussh_keys::key::KeyPair::generate_ed25519().unwrap());
    config.auth_banner = Some("Please log in with password \"Test\"\n");
    let config = Arc::new(config);

    // The data channel: The channel players use to send actions etc....
    let (data_tx, data_rx) = mpsc::channel(1_024);

    // The command channel: The channel used to send requests from the session to the world
    let (command_tx, command_rx) = mpsc::channel(1_024);


    // Create the server
    let sh = Server{
        id: 0,
        tx_data_channel: Arc::new(Mutex::new(data_tx.clone())),
        tx_command_channel: Arc::new(Mutex::new(command_tx.clone()))
    };

    (sh, config, data_rx, command_rx)
}

The main module:

mod infrastructure;

#[macro_use] extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate signal_hook;
extern crate serde;
use thrussh::CryptoVec;
use tracing::instrument;

#[instrument]
#[tokio::main]
async fn main() {
    // Set up logging
    env_logger::init();

    // Configure the ssh server
    let (sh, config,
        mut sender_data_rx, mut sender_command_rx)
        = infrastructure::ssh_server::init_ssh_server();

    // Spawn World Thread
    tokio::spawn(async move{
        // For sake of experiment only register first client
        let mut registered = false;
        // The session info about the one client we serve
        let mut client_session_info = None;

        loop {
            // See if there is a client registration
            // (for testing only the first to avoid hash maps etc....)
            if registered == false {
                client_session_info = match sender_command_rx.try_recv() {
                    Ok((cid, mut h)) => {
                        println!("Received command. Registering client {:?}",cid);
                        h.data(cid, CryptoVec::from_slice(
                            "Client registered to World!\r\n".as_ref()))
                            .await.expect("Could not send registration msg.");
                        registered = true;
                        Some((cid.clone(), h.clone()))
                    },
                    Err(_) => None
                };
            };

            // See if there is a player action or data
            let data = match sender_data_rx.try_recv() {
                Ok(d) => {
                    println!("Received data: {}", d);
                    Some(d)
                },
                Err(_) => None
            };

            // If we got data, then send something to the ssh user.
            match data {
                // For this debug we don't care about the actual data.
                Some(_) => {
                        match client_session_info.clone() {
                            Some((channel_id, mut handle)) => {
                                handle.data(
                                    channel_id, CryptoVec::from_slice(
                                        "Dummy message".as_ref()))
                                    .await.expect("Could not send data message to client.");
                            },
                            None => {}
                        }
                    },
                None => {}
            }
        }
    });

    // Start the ssh server and listen for incoming connections
    info!("Spawning ssh server listening at: {}", "0.0.0.0:2222");
    thrussh::server::run(config, "0.0.0.0:2222", sh).await.unwrap();
}

You are holding a synchronous Mutex (i.e. the one from std::sync) over an await point in an async fn. Rust's async fns work in a non-blocking way, sort-of a cooperative-multitasking environment, so that blocking on something for a long time can be problematic; a call to lock on Mutex has the potential to block for a long time if some users of the Mutex can hold onto the lock for a long time; the fact that you do a .send(...).await call while holding onto the lock means exactly that: your Mutex can be locked for a long time.

There's an alternative Mutex that has an API with async fns which will work well with async fn execution: tokio::sync::Mutex. (For situations where the mutex is never locked for a long time, using std's Mutex is actually encouraged even in async fn, make sure to read the linked documentation of tokio::sync::Mutex for more information.)

Using the wrong Mutex has the potential to lock up the whole worker thread; there's only so many of those. You can even run tokio single-threaded, in which case the effect can be an immediate deadlock; with multi-threaded run-time, I believe deadlocks can only happen from this if you're doing this a couple of times in parallel, until all the worker threads are locked; I don't know whether this is the underlying error here, or whether you have a problem that would even deadlock when using async Mutexes. In any case, this is useful information to know in general when working with async Rust.


In this concrete use-case you probably don't need a Mutex at all though:

You are wrapping an tokio::mpsc::Sender into a Mutex. The "mp" stands for "multiple producer", i.e. the channel already supports multiple senders (i.e. the sender can be cloned) or concurrent access to the same sender without any problem. (Pay attention to the send method's API, it takes &self, not &mut self.) Hence you can actually call send directly on an Arc<Sender<...>>, no need for Arc<Mutex<Sender<...>>>.

Perhaps that already solves all your deadlocking problems.

Dear @steffahn

Thanks for you quick and very thorough reply. I agree that the Mutex most likely is unnecessary and learned new things (just starting with async in rust). Also, I think the Arc is not necessary - although it should not have any influence except more overhead?

However, I removed the Mutex. Unfortunately this does not resolve the deadlock problem. I also tested with and without Arc.

Looking at your code a second time, I'm noticing the main function featuring a busy-waiting loop in the spawned task. That's a bad approach in either synchronous nor asynchronous code. This might not be the reason for the deadlock either, but it should be avoided.

Instead, you should not use try_recv() in a loop, but do an actual recv().await in a loop. You can listen to multiple channels at once using select, or you could spawn two independent tasts, one handling the sender_command_rx and one handling the sender_data_rx.

On that note, I don't know trussh so I don't know how often channel_open_session can be called. But you don't receive more than one item from the "command" channel, and it's limited to 1_024 items, so there's some potential for problems. Re-reading your original post, it seems like multiple sessions aren't tested anyways, so it's probably unimportant.


And another problem: The server::Handler trait seems to expect that you use asynchronous code, but you call block_on which will (synchronously) block on a future; another opportunity to lock up a whole worker thread. If data is called a lot of times, and the "data" channel fills up faster than it's consumed so it reaches its limit, then this has the potential to lock up the whole runtime. This might not be the deadlock you're facing either, because as far as I understand the API docs, there's only one Handler for one client, and one Handler can only support one method being called at a time.

Anyways.. for fixing this:

Filling in the Future... types of Handler, you could use BoxFuture<Result<(Self, Session), anyhow::Error>> for FutureUnit, which is the only one where you actually implement any method that needs to .await anything; the other ones where you don't need any awaits in the implementations can stay future::Ready for now.

In the implementation, instead of using block_on, use an async { ... } block (you might as well just wrap the whole function body for this), you can bring FutureExt into scope to use the boxed method, then return that boxed future from the method. E.g.

    fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        async move {
            // Register client with the world - pass the handle to world thread
            //
            // This needs to be done to enable the world thread to send data to the
            // ssh user (eg. a description or a result).
            let handle = session.handle();
            if let Err(_) = self.tx_command_channel.send((channel, handle.clone())).await {
                //error!("channel_open_session(): receiver dropped");
            } else {
                //info!("channel_open_session(): Sent client id and handle to world.")
            };

            // Display a welcome message
            session.data(channel,CryptoVec::from_slice("Welcome.\r\n".as_ref()));
            Ok((self, session))
        }.boxed()
    }
1 Like

Actually, this won't work for data because you can't capture the borrowed data: &[u8] in such a future. Since data is not used for the send call anyways, you can do the check handling data first:

    fn data(self, _channel: ChannelId, data: &[u8], session: server::Session) -> Self::FutureUnit { 
        if data.as_ref() == "\u{000d}".as_bytes() {
            async move {
                // Send data to world instance on CR 
                let tx = self.tx_data_channel.lock().unwrap();
                if let Err(_) = tx.send("Test".to_string()).await { 
                    println!("data(): receiver dropped");
                };

                Ok((self, session))
            }.boxed()
        } else {
            async move {
                Ok((self, session))
            }.boxed()
        }
    }

or

    fn data(self, _channel: ChannelId, data: &[u8], session: server::Session) -> Self::FutureUnit {
        let condition = data.as_ref() == "\u{000d}".as_bytes();
        async move {
            if condition {
                // Send data to world instance on CR 
                let tx = self.tx_data_channel.lock().unwrap();
                if let Err(_) = tx.send("Test".to_string()).await { 
                    println!("data(): receiver dropped");
                };
            }
            Ok((self, session))
        }.boxed()
    }
1 Like

@steffahn Thanks for the suggestions. They make sense. I am a bit in a hurry, so I will look deeper into your suggested fix. I wanted to try it very quickly, but I struggled with the return type as boxed() returns a pin but the trait implementation of data expects a FutureUnit

^ expected struct `futures::future::Ready`, found struct `Pin`

Anyhow, I will try later and then let you know if I could resolve it.

You define FutureUnit yourself. As mentioned

i.e. I suggest you change the type FutureUnit = ... definition :wink:

Sometimes staring long at the code makes you oversee the obvious ;-). Thanks for the hint with the type definition.

I changed the data() function to use the BoxFuture. This made sense to me as I suspected the culprit to be block_on(). The suspicion was driven by the fact that the main busy waiting loop still ran, but the spawned client connection from thrussh did no longer respond to key presses. However, just changing this did not resolve the deadlock. (I tried to understand the problem better using tokio-console but am still too inexperienced iwth it that I could pin the problem)
I then also changed the busy waiting loop into the try_recv() you proposed. And this seems to eliminate the deadlock. I don't yet really trust it - as I could not pin the problem - but I could not get it to lock up anymore.

Thank you very much for your in depth and patient help! Much apreciated. I post the seemingly working code below. I will next implement the routines such that they support multiple clients and echo back the entered data. If this works I will also post this solution in the thread later as I think others might have similar requirements. If the deadlock comes back I will as again - fingers crossed its gone :-).

The server module (infrastructure::ssh_server):

extern crate thrussh;
extern crate futures;
extern crate tokio;
use crate::futures::FutureExt;
use std::pin::Pin;

use std::sync::Arc;
use thrussh::*;
use thrussh::server::{Auth, Session};
use anyhow;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};

#[derive(Clone)]
pub struct Server {
    id: usize,
    tx_data_channel: Sender<String>,
    tx_command_channel: Sender<(ChannelId, thrussh::server::Handle)>, 
}

impl server::Server for Server {
    type Handler = Self;
    fn new(&mut self, _: Option<std::net::SocketAddr>) -> Self {
        let s = self.clone();
        self.id += 1;
        s
    }
}

impl server::Handler for Server {
    type Error = anyhow::Error;
    type FutureAuth = futures::future::Ready<Result<(Self, server::Auth), anyhow::Error>>;
    type FutureUnit = Pin<Box<dyn futures::Future<Output = Result<(Self, Session), anyhow::Error>> + std::marker::Send>>;
    type FutureBool = futures::future::Ready<Result<(Self, Session, bool), anyhow::Error>>;

    fn finished_auth(self, auth: Auth) -> Self::FutureAuth { futures::future::ready(Ok((self, auth))) }
    fn finished_bool(self, b: bool, s: Session) -> Self::FutureBool { futures::future::ready(Ok((self, s, b))) }
    fn finished(self, s: Session) -> Self::FutureUnit {
        Box::pin(futures::future::ready(Ok((self, s))))
    }

    fn auth_password(self, _user: &str, password: &str) -> Self::FutureAuth {
        if password == "Test" {
            futures::future::ready(Ok((self, server::Auth::Accept)))
        } else {
            futures::future::ready(Ok((self, server::Auth::Reject)))
        }
    }

    //noinspection ALL
    fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        async move {
            // Register client with the world - pass the handle to world thread
            //
            // This needs to be done to enable the world thread to send data to the
            // ssh user (eg. a description or a result).
            let handle = session.handle();
            if let Err(_) = self.tx_command_channel.send((channel, handle.clone())).await {
                error!("channel_open_session(): receiver dropped");
            } else {
                info!("channel_open_session(): Sent client id and handle to world.")
            };

            // Display a welcome message
            session.data(channel,CryptoVec::from_slice("Welcome.\r\n".as_ref()));
            Ok((self, session))
        }.boxed()
    }


    fn data(self, _channel: ChannelId, data: &[u8], session: server::Session) -> Self::FutureUnit { 
        let condition = data.as_ref() == "\u{000d}".as_bytes();
        let tx = self.tx_data_channel.clone();
        async move {
            if condition {
                // Send data to world instance on CR 
                if let Err(_) = tx.send("Test".to_string()).await { 
                    println!("data(): receiver dropped");
                };
            }
            Ok((self, session))
        }.boxed()
    }
}

pub fn init_ssh_server() -> (Server, Arc<thrussh::server::Config>,
                             Receiver<String>, Receiver<(ChannelId, thrussh::server::Handle)>) {
    // Configure the server
    let mut config = thrussh::server::Config::default();
    config.connection_timeout = Some(std::time::Duration::from_secs(600));
    config.auth_rejection_time = std::time::Duration::from_secs(3);
    config.keys.push(thrussh_keys::key::KeyPair::generate_ed25519().unwrap());
    config.auth_banner = Some("Please log in with password \"Test\"\n");
    let config = Arc::new(config);

    // The data channel: The channel players use to send actions etc....
    let (data_tx, data_rx) = mpsc::channel(1_024);

    // The command channel: The channel used to send requests from the session to the world
    let (command_tx, command_rx) = mpsc::channel(1_024);


    // Create the server
    let sh = Server{
        id: 0,
        tx_data_channel: data_tx.clone(),
        tx_command_channel: command_tx.clone()
    };

    (sh, config, data_rx, command_rx)
}

The main module:

mod infrastructure;

#[macro_use] extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate signal_hook;
extern crate serde;
use thrussh::CryptoVec;
use tracing::instrument;
// use tokio::{select, runtime::Handle};

#[instrument]
#[tokio::main]
async fn main() {
    // Set up logging
    env_logger::init();
    console_subscriber::init();

    // Configure the ssh server
    let (sh, config,
        mut sender_data_rx, mut sender_command_rx)
        = infrastructure::ssh_server::init_ssh_server();

    // Spawn World Thread
    tokio::spawn(async move{
        // For sake of experiment only register first client
        //let mut registered = false;
        // The session info about the one client we serve
        //let mut client_session_info = None;

        let mut handle = None;
        let mut channel_id = None;
        let data = None;

        loop {
            let mut hndl = handle.clone();
            let mut chan = channel_id.clone();
            let mut dta = data.clone();

            tokio::select! {
                Some((cid, mut h)) = sender_command_rx.recv() => {
                    info!("Got handle from command rx");
                    h.data(cid, CryptoVec::from_slice(
                        "Client registered to World!\r\n".as_ref()))
                        .await.expect("Could not send registration msg.");
                    hndl = Some(h);
                    chan = Some(cid);

                }
                Some(v) = sender_data_rx.recv() => {
                    info!("Got {:?} from data tx", v);
                    dta = Some(v);
                }
                else => {
                    error!("Both channels closed");
                }
            }

            match hndl {
                Some (mut h) => {
                    handle = Some(h.clone());
                    match chan {
                        Some(cid) => {
                            channel_id = Some(cid.clone());
                            match dta {
                                Some(d) => {
                                    info!("Sending data \"{}\" to client", d);
                                    h.data(cid, CryptoVec::from_slice(
                                    d.as_ref())).await.expect("Could not send data message to client.");
                                }
                                None => {}
                            }
                        },
                        None => {}
                    }
                },
                None => {}
            }
        } 
    });

    // Start the ssh server and listen for incoming connections
    info!("Spawning ssh server listening at: {}", "0.0.0.0:2222");
    thrussh::server::run(config, "0.0.0.0:2222", sh).await.unwrap();
}

As promised, here is a repository with the final example code that handles multiple connections without deadlocking. See readme for details.

@steffahn thanks again for the good guidance.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.