I'm writing a service which is supposed to do something very simple: Accept "hash file" requests (a 3-tuple containing hash algorithm, identifier and filename) over loopback, hash the file, and return the hash to the connected client.
The service needs to be able to hash multiple files in parallel. (The id in the protocol is used to keep track if the request/reply).
I would like to implement this using the async frameword; specifically I'm using async-std, but if there's good reason I can switch to tokio or any other crate.
Because some of the files can be huge I want to make sure that I don't block the executor thread, so the actual hashing is done on threads using a threadpool (which would also allow me to queue jobs and limit the number of threads it would launch).
Implementing the code that accepts connections from clients, reads the hash requests, parses, starts a thread and hashes the file was easy. The issue I'm having is writing the result back to the client -- or rather getting a writer to the tread.
I've trimmed the code and inlined it (the playground didn't seem to have async_std):
use async_std::{
io,
prelude::*,
task,
net::{TcpListener, TcpStream, ToSocketAddrs},
};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
use std::sync::Arc;
use std::path::PathBuf;
use threadpool::ThreadPool;
enum HashAlg {
Sha2_256
}
struct Job {
pathname: PathBuf,
alg: HashAlg,
id: String,
//sock: io::BufWriter
}
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
let _handle = task::spawn(connection_loop(stream));
}
Ok(())
}
async fn connection_loop(stream: TcpStream) -> Result<()> {
let reader = io::BufReader::new(&stream);
let mut lines = reader.lines();
let tpool = ThreadPool::new(4);
while let Some(line) = lines.next().await {
let line = line?;
let (alg, rest) = match line.find(' ') {
None => continue,
Some(idx) => (&line[..idx], line[idx+1 ..].trim()),
};
let (id, fname) = match rest.find(' ') {
None => continue,
Some(idx) => (&rest[..idx], rest[idx+1 ..].trim()),
};
println!("alg: '{}' id: '{}' fname: '{}'", alg, id, fname);
let alg = HashAlg::Sha2_256;
let jobspec = Job { pathname: std::path::PathBuf::from(fname), alg,
id: id.to_string() };
tpool.execute(move|| {
println!("{}", jobspec.id);
println!("Launched thread!");
});
tpool.join();
Ok(())
}
Because the hash results aren't actually required in the executor thread my initial instinct was to try to pass a writer to the thread, and have the thread actually write the response once it's done. This however turned out to be more complicated than I had initially thought. (Need to split out the writer, and it would further need to be shared and protected).
I'm wondering if I should be passing the results back to the executor somehow (it would allow me to not have to split out the writer and lock it) -- but I'm not sure how I would go about doing that.
Apparently there's a threadpool specifically for futures. I took a look at an example (futures_threadpool - Rust) and it looks promising, but I'm not sure how I would return the hash to the executor thread. (I don't want to join to wait for result, because the executor needs to keep processing new hash requests).
I get the feeling that I'm thinking about this problem completely wrong, so I'd be grateful for some hints'n'tips.