I am trying to build a ssh server using the thrussh crate that receives commands from clients, forwards them to a world thread and that world thread can send replies to the clients (players). I hit a problem (ssh server thread deadlocking somewhere) that I am unsure how to approach. I tried to keep the explanation short and hopefully clear, otherwise I am happy to reformulate.
To achieve communication from the ssh server to the world thread I open a command and a data channel as mspc::channel with the tx side of both wrapped in an Arc and a Mutex stored in the server struct and the rx side moved into the world thread. This seems to work fine.
To send data to clients of the server, one needs to use handle.data() function. To enable the world thread to send data, I thought I will send the ClientId and the handle to the session in the channel_open_session() function - which initiates the ssh server session - over the command channel to the world thread that in turn then uses this handle to send messages through handle.data() (arbitrary text, eg. describing the world) to the ssh user. However, I run into deadlocks, especially when the server::data() function sends information over the data channel, the world thread receives this and sends some text to the user using handle.data() function. Also, sometimes handle.data() panics. I could establish, that the world thread does not deadlock, it continues running as expected. I assume the deadlock happens somewhere in the thrussh server thread, but I am not sure where. The deadlock can be observer such that entering something on the client side does not lead to any reaction on the server side.
The deadlock (and the panic) does not always occur. I tried to compile a minimal example that leads to the deadlock. I am aware that this example is not working with multiple clients etc…., but I decided to do it like this to avoid added code complexity that would result from registering different clients. I tested the code with a single server and a single connection to it. Happy to provide a screen recording or give access to a git repository if needed.
Any hint on where to focus my debugging efforts is greatly welcome. Also maybe I go about this the wrong way and there is a better way to allow data to be sent from thrussh to another thread and from another independent thread to thrussh users?
The server module (infrastructure::ssh_server):
extern crate thrussh;
extern crate futures;
extern crate tokio;
use std::sync::{Mutex, Arc};
use thrussh::*;
use thrussh::server::{Auth, Session};
use anyhow;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
#[derive(Clone)]
pub struct Server {
id: usize,
tx_data_channel: Arc<Mutex<Sender<String>>>,
tx_command_channel: Arc<Mutex<Sender<(ChannelId, thrussh::server::Handle)>>>,
}
impl server::Server for Server {
type Handler = Self;
fn new(&mut self, _: Option<std::net::SocketAddr>) -> Self {
let s = self.clone();
self.id += 1;
s
}
}
impl server::Handler for Server {
type Error = anyhow::Error;
type FutureAuth = futures::future::Ready<Result<(Self, server::Auth), anyhow::Error>>;
type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
type FutureBool = futures::future::Ready<Result<(Self, Session, bool), anyhow::Error>>;
fn finished_auth(self, auth: Auth) -> Self::FutureAuth { futures::future::ready(Ok((self, auth))) }
fn finished_bool(self, b: bool, s: Session) -> Self::FutureBool { futures::future::ready(Ok((self, s, b))) }
fn finished(self, s: Session) -> Self::FutureUnit {
futures::future::ready(Ok((self, s)))
}
fn auth_password(self, _user: &str, password: &str) -> Self::FutureAuth {
if password == "Test" {
futures::future::ready(Ok((self, server::Auth::Accept)))
} else {
futures::future::ready(Ok((self, server::Auth::Reject)))
}
}
//noinspection ALL
fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
// Register client with the world - pass the handle to world thread
//
// This needs to be done to enable the world thread to send data to the
// ssh user (eg. a description or a result).
let handle = session.handle();
futures::executor::block_on( async {
let tx = self.tx_command_channel.lock().unwrap();
if let Err(_) = tx.send((channel, handle.clone())).await {
//error!("channel_open_session(): receiver dropped");
} else {
//info!("channel_open_session(): Sent client id and handle to world.")
};
});
// Display a welcome message
session.data(channel,CryptoVec::from_slice("Welcome.\r\n".as_ref()));
futures::future::ready(Ok((self, session)))
}
fn data(self, _channel: ChannelId, data: &[u8], session: server::Session) -> Self::FutureUnit {
if data.as_ref() == "\u{000d}".as_bytes() {
// Send data to world instance on CR
futures::executor::block_on( async {
let tx = self.tx_data_channel.lock().unwrap();
if let Err(_) = tx.send("Test".to_string()).await {
println!("data(): receiver dropped");
};
});
}
futures::future::ready(Ok((self, session)))
}
}
pub fn init_ssh_server() -> (Server, Arc<thrussh::server::Config>,
Receiver<String>, Receiver<(ChannelId, thrussh::server::Handle)>) {
// Configure the server
let mut config = thrussh::server::Config::default();
config.connection_timeout = Some(std::time::Duration::from_secs(600));
config.auth_rejection_time = std::time::Duration::from_secs(3);
config.keys.push(thrussh_keys::key::KeyPair::generate_ed25519().unwrap());
config.auth_banner = Some("Please log in with password \"Test\"\n");
let config = Arc::new(config);
// The data channel: The channel players use to send actions etc....
let (data_tx, data_rx) = mpsc::channel(1_024);
// The command channel: The channel used to send requests from the session to the world
let (command_tx, command_rx) = mpsc::channel(1_024);
// Create the server
let sh = Server{
id: 0,
tx_data_channel: Arc::new(Mutex::new(data_tx.clone())),
tx_command_channel: Arc::new(Mutex::new(command_tx.clone()))
};
(sh, config, data_rx, command_rx)
}
The main module:
mod infrastructure;
#[macro_use] extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate signal_hook;
extern crate serde;
use thrussh::CryptoVec;
use tracing::instrument;
#[instrument]
#[tokio::main]
async fn main() {
// Set up logging
env_logger::init();
// Configure the ssh server
let (sh, config,
mut sender_data_rx, mut sender_command_rx)
= infrastructure::ssh_server::init_ssh_server();
// Spawn World Thread
tokio::spawn(async move{
// For sake of experiment only register first client
let mut registered = false;
// The session info about the one client we serve
let mut client_session_info = None;
loop {
// See if there is a client registration
// (for testing only the first to avoid hash maps etc....)
if registered == false {
client_session_info = match sender_command_rx.try_recv() {
Ok((cid, mut h)) => {
println!("Received command. Registering client {:?}",cid);
h.data(cid, CryptoVec::from_slice(
"Client registered to World!\r\n".as_ref()))
.await.expect("Could not send registration msg.");
registered = true;
Some((cid.clone(), h.clone()))
},
Err(_) => None
};
};
// See if there is a player action or data
let data = match sender_data_rx.try_recv() {
Ok(d) => {
println!("Received data: {}", d);
Some(d)
},
Err(_) => None
};
// If we got data, then send something to the ssh user.
match data {
// For this debug we don't care about the actual data.
Some(_) => {
match client_session_info.clone() {
Some((channel_id, mut handle)) => {
handle.data(
channel_id, CryptoVec::from_slice(
"Dummy message".as_ref()))
.await.expect("Could not send data message to client.");
},
None => {}
}
},
None => {}
}
}
});
// Start the ssh server and listen for incoming connections
info!("Spawning ssh server listening at: {}", "0.0.0.0:2222");
thrussh::server::run(config, "0.0.0.0:2222", sh).await.unwrap();
}