I am looking for a rust libraries which are similar to paramiko in python.
So I found SSH2, but I am not sure how to use it.
We use russh as an SSH client akin to Python's Paramiko. The examples are fairly clear if you're just trying to do simple SSH access to a remote host, but it does heavily depend on async
and Tokio.
Thank you for your kind attention.
Could you provide me examples and materials about the russh?
Actually I am converting the python code into rust.
This is python code.
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(host['host'],
port=host['port'],
username=host['user'],
password=host['pwd'])
Could you give me some advice about this?
Did you try adapting one of the examples? Can you post code that you tried, and ask specific questions?
Since this depends on async and Tokio (as they said), be sure you've first learned to use these, to some degree. There is a good async book and Tokio tutorial.
Also note that if you're not already using Tokio, you'll have to do that, so you may want to first determine whether this is practical for your application. Please say more about what you're doing -- what you've already done, and what the next step is.
Thank you for your support.
This is the source code I tried to convert.
import commune as c
import streamlit as st
from typing import *
from typing import Any, Union
from paramiko.client import SSHClient, AutoAddPolicy
from rich.console import Console
import os
class Remote(c.Module):
filetype = 'yaml'
root_path = root = os.path.dirname(os.path.dirname(__file__))
libpath = lib_path = os.path.dirname(root_path)
datapath = os.path.join(libpath, 'data')
host_data_path = f'{datapath}/hosts.{filetype}'
host_url = 'https://raw.githubusercontent.com/communeai/commune/main/hosts.yaml'
executable_path='commune/bin/c'
@classmethod
def ssh_cmd(cls, *cmd_args,
host:str= None,
cwd:str= None,
verbose=False,
sudo=False,
key=None,
timeout=10,
**kwargs ):
self = cls()
command = ' '.join(cmd_args).strip()
if command.startswith('c '):
command = command.replace('c ', cls.executable_path + ' ')
if cwd != None:
command = f'cd {cwd} && {command}'
hosts = cls.hosts()
host_name = host
if host_name == None:
host = cls.c_choice(list(hosts))
print("host is what", host)
host_name = host
if host_name not in hosts:
raise Exception(f'Host {host_name} not found')
host = hosts[host_name]
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(host['host'],
port=host['port'],
username=host['user'],
password=host['pwd'])
if sudo and host['user'] != "root":
command = "sudo -S -p '' %s" % command
stdin, stdout, stderr = client.exec_command(command)
try:
if sudo:
stdin.write(host['pwd'] + "\n")
stdin.flush()
color = self.c_random_color()
outputs = {'error': '', 'output': ''}
for line in stdout.readlines():
if verbose:
self.c_print(f'[bold]{host_name}[/bold]', line.strip('\n'), color=color)
outputs['output'] += line
for line in stderr.readlines():
if verbose:
self.c_print(f'[bold]{host_name}[/bold]', line.strip('\n'))
outputs['error'] += line
if len(outputs['error']) == 0:
outputs = outputs['output']
except Exception as e:
self.c_print(e)
return outputs
@classmethod
def c_choice(cls, options:Union[list, dict])->list:
self = cls()
import random
options = self.c_copy(options)
if len(options) == 0:
return None
if isinstance(options, dict):
options = list(options.values())
assert isinstance(options, list),'options must be a list'
return random.choice(options)
@classmethod
def c_copy(cls, data: Any) -> Any:
import copy
return copy.deepcopy(data)
@classmethod
def c_random_color(cls):
import random
return random.choice(cls.c_colors())
@classmethod
def c_colors(cls):
return ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', 'bright_black', 'bright_red', 'bright_green', 'bright_yellow', 'bright_blue', 'bright_magenta', 'bright_cyan', 'bright_white']
@classmethod
def c_print(cls, *text:str,
color:str=None,
verbose:bool = True,
console: Console = None,
flush:bool = False,
**kwargs):
if verbose:
if color == 'random':
color = cls.c_random_color()
if color:
kwargs['style'] = color
console = cls.resolve_console(console, **kwargs)
try:
if flush:
console.print(**kwargs, end='\r')
console.print(*text, **kwargs,)
except Exception as e:
print(e,)
@classmethod
def c_resolve_console(cls, console = None, **kwargs):
if not hasattr(cls,'console'):
cls.console = Console(**kwargs)
if console is not None:
cls.console = console
return cls.console
@classmethod
def add_host(cls,
cmd:str = None , # in the format of
host:str = '0.0.0.0',
port:int = 22,
user:str = 'root',
pwd:str = None,
name : str = None
):
hosts = cls.hosts()
host = {
'host': host,
'port': port,
'user': user,
'pwd': pwd
}
if name == None:
cnt = 0
name = f'{user}{cnt}'
while name in hosts:
name = f'{user}{cnt}'
cnt += 1
hosts[name] = host
cls.save_hosts(hosts)
return {'status': 'success', '': f'Host added', }
@classmethod
def save_hosts(cls, hosts=None, filetype=filetype, path = None):
if path == None:
path = cls.host_data_path
if hosts == None:
hosts = cls.hosts()
if filetype == 'json':
cls.put_json(path, hosts)
elif filetype == 'yaml':
cls.put_yaml(path, hosts)
return {
'status': 'success',
'msg': f'Hosts saved',
'hosts': hosts,
'path': cls.host_data_path,
'filetype': filetype
}
@classmethod
def load_hosts(cls, path = None, filetype=filetype):
if path == None:
path = cls.host_data_path
if filetype == 'json':
return cls.get_json(path, {})
elif filetype == 'yaml':
return cls.get_yaml(path, {})
@classmethod
def switch_hosts(cls, path):
hosts = c.get_json(path)
cls.save_hosts(hosts)
return {'status': 'success', 'msg': f'Host data path switched to {path}'}
@classmethod
def rm_host(cls, name):
hosts = cls.hosts()
if name in hosts:
del hosts[name]
cls.save_hosts( hosts)
c.print(cls.hosts())
return {'status': 'success', 'msg': f'Host {name} removed'}
else:
return {'status': 'error', 'msg': f'Host {name} not found'}
@classmethod
def hosts(cls, search=None, filetype=filetype, enable_search_terms: bool = False):
hosts = cls.load_hosts(filetype=filetype)
if len(hosts) == 0:
assert False, f'No hosts found, please add your hosts to {cls.host_data_path}'
if search != None:
hosts = {k:v for k,v in hosts.items() if search in k}
if enable_search_terms:
return cls.filter_hosts(hosts=hosts)
return hosts
host_map = hosts
@classmethod
def host2ip(cls, search=None):
hosts = cls.hosts(search=search)
return {k:v['host'] for k,v in hosts.items()}
@classmethod
def ip2host(cls, search=None):
host2ip = cls.host2ip(search=search)
return {v:k for k,v in host2ip.items()}
@classmethod
def names(cls, search=None):
return list(cls.hosts(search=search).keys())
def host2name(self, host):
hosts = self.hosts()
for name, h in hosts.items():
if h == host:
return name
raise Exception(f'Host {host} not found')
@classmethod
def n(cls, search=None):
return len(cls.hosts(search=search))
def num_servers(self):
return len(self.servers())
@classmethod
def n_servers(self):
return len(self.servers())
@classmethod
def host(self, name):
hosts = self.hosts()
if name not in hosts:
raise Exception(f'Host {name} not found')
return hosts[name]
...
Here, I want to optimize the code and replace threads and HTTP request with Rust.
I'd be really thank you if you just give me advices or directions.
How will this Rust code be called: Will you create a Rust library and call it from Python code? Or will you create a new Rust executable?
Also, how well do you know Rust? I think it would be a mistake to use this project as the first thing to try in Rust. I'm asking because you haven't indicated anything about your plan for doing this, and it seems you're asking for someone else to come up with a plan.
If you have some Rust experience and want to proceed, I suggest you look at the examples that were mentioned, try to find one that roughly fits what you need, try using it, and then ask specific questions about it.
I would adapt russh/russh/examples/client_exec_interactive.rs at main · warp-tech/russh · GitHub to your needs - it's the second example in the list from the link I gave you.
Thank you for your selfless support.
This seems like a very difficult project for me.
I will analyze it further and then ask for your help.
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.