Looking for threaded/async tips for a specific problem (handing over writer socket)

I'm writing a service which is supposed to do something very simple: Accept "hash file" requests (a 3-tuple containing hash algorithm, identifier and filename) over loopback, hash the file, and return the hash to the connected client.

The service needs to be able to hash multiple files in parallel. (The id in the protocol is used to keep track if the request/reply).

I would like to implement this using the async frameword; specifically I'm using async-std, but if there's good reason I can switch to tokio or any other crate.

Because some of the files can be huge I want to make sure that I don't block the executor thread, so the actual hashing is done on threads using a threadpool (which would also allow me to queue jobs and limit the number of threads it would launch).

Implementing the code that accepts connections from clients, reads the hash requests, parses, starts a thread and hashes the file was easy. The issue I'm having is writing the result back to the client -- or rather getting a writer to the tread.

I've trimmed the code and inlined it (the playground didn't seem to have async_std):

use async_std::{
  io,
  prelude::*,
  task,
  net::{TcpListener, TcpStream, ToSocketAddrs},
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

use std::sync::Arc;
use std::path::PathBuf;
use threadpool::ThreadPool;

enum HashAlg {
  Sha2_256
}


struct Job {
  pathname: PathBuf,
  alg: HashAlg,
  id: String,
  //sock: io::BufWriter
}

async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
  let listener = TcpListener::bind(addr).await?;
  let mut incoming = listener.incoming();
  while let Some(stream) = incoming.next().await {
    let stream = stream?;
    println!("Accepting from: {}", stream.peer_addr()?);
    let _handle = task::spawn(connection_loop(stream));
  }
  Ok(())
}

async fn connection_loop(stream: TcpStream) -> Result<()> {
  let reader = io::BufReader::new(&stream);
  let mut lines = reader.lines();

  let tpool = ThreadPool::new(4);

  while let Some(line) = lines.next().await {
    let line = line?;

    let (alg, rest) = match line.find(' ') {
      None => continue,
      Some(idx) => (&line[..idx], line[idx+1 ..].trim()),
    };
    let (id, fname) = match rest.find(' ') {
      None => continue,
      Some(idx) => (&rest[..idx], rest[idx+1 ..].trim()),
    };

    println!("alg: '{}'  id: '{}'  fname: '{}'", alg, id, fname);

    let alg = HashAlg::Sha2_256;

    let jobspec = Job { pathname: std::path::PathBuf::from(fname), alg,
      id: id.to_string() };
    tpool.execute(move|| {
      println!("{}", jobspec.id);
      println!("Launched thread!");
    });

  tpool.join();

  Ok(())
}

Because the hash results aren't actually required in the executor thread my initial instinct was to try to pass a writer to the thread, and have the thread actually write the response once it's done. This however turned out to be more complicated than I had initially thought. (Need to split out the writer, and it would further need to be shared and protected).

I'm wondering if I should be passing the results back to the executor somehow (it would allow me to not have to split out the writer and lock it) -- but I'm not sure how I would go about doing that.

Apparently there's a threadpool specifically for futures. I took a look at an example (futures_threadpool - Rust) and it looks promising, but I'm not sure how I would return the hash to the executor thread. (I don't want to join to wait for result, because the executor needs to keep processing new hash requests).

I get the feeling that I'm thinking about this problem completely wrong, so I'd be grateful for some hints'n'tips.

Instead of using separate thread pool you can spawn blocking code, both async-std and tokio runtime schedules blocking code to separate threads and async-std doing that automatically with a new scheduler, see their post form today async-std - Stop worrying about blocking: the new async-std runtime, inspired by Go. As soon as you make your hasher returning a future you can await on it and return result to a stream, or you can also make a try_join! with timeout future.

Though if you would want to limit number of hasher jobs running in parallel that will be more complicated as you will most likely need to implement your async queue. Probably using framework like actix is better suited for your purpose, it allows to limit number of workers as well as manage connection backlog...

1 Like

It looks like you're creating a new thread pool for every connection? Additionally you're calling join inside the asynchronous function and thus blocking the task.

What I would recommend is:

  1. Create the ThreadPool in accept_loop (or even further out)
  2. Pass the thread pool to every function that needs to spawn something. You may need to put it in an Arc to share it like this.
  3. Use channels to send the response back to the connection_loop.
  4. Don't bother joining the thread pool. You know you are done when you get the message on the channel.

As for the details with the channel, I would do this:

  1. Inside connection_loop: create a vector for channels. One for each task on the thread pool.
  2. The channel I would use is the oneshot channel from the futures crate.
  3. As you're spawning tasks on the thread pool, just put the channels into the vector.
  4. After spawning the tasks, iterate through the vector and await the channels. You can await them because the oneshot channel supports async await.

The advantage of this method compared to writing from the thread pool is that you are guaranteed to get the hashes in the correct order.

1 Like

I decided to go a completely different route and try to use a custom future. The idea being that each HasherFuture launches a thread which hashes the function and reports back once its done.

Using:

sha2 = "0.8.0"
hex = "0.4.0"

I used the example code from the async book and got this:

use {
  std::{
    future::Future,
    sync::{Arc,Mutex},
    task::{Context,Poll,Waker},
    pin::Pin,
    thread,
    path::PathBuf
  },
  sha2::{Sha256, Digest}
};


pub struct HasherFuture {
  shared_state: Arc<Mutex<SharedState>>
}

struct SharedState {
  waker: Option<Waker>,
  hash: Option<String>,
  id: String
}

pub struct HashRet {
  id: String,
  hash: String
}

impl Future for HasherFuture {
  type Output = HashRet;
  fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut shared_state = self.shared_state.lock().unwrap();
    if let Some(hash) = &shared_state.hash {
      let hr = HashRet { id: shared_state.id.clone(), hash: hash.clone() };
      Poll::Ready(hr)
    } else {
      shared_state.waker = Some(ctx.waker().clone());
      Poll::Pending
    }
  }
}

impl HasherFuture {
  pub fn new(id: &str, fname: &PathBuf) -> Self {
    let shstate = SharedState { waker: None, hash: None, id: id.to_string() };
    let shared_state = Arc::new(Mutex::new(shstate));

    let mut file = std::fs::File::open(&fname).expect("Unable to open file");
    let mut sha256 = Sha256::new();

    let thread_shared_state = shared_state.clone();
    thread::spawn(move || {
      let _n = std::io::copy(&mut file, &mut sha256);
      let hash = sha256.result();
      let mut shared_state = thread_shared_state.lock().unwrap();
      shared_state.hash = Some(hex::encode(hash));
      if let Some(waker) = shared_state.waker.take() {
        waker.wake()
      }
    });

    HasherFuture { shared_state }
  }
}

How do I actually utilize this from, say, async-std? I know the async keyword is supposed to create a function which returns a future which needs to be fed to an executor -- but how does rust know it is supposed to create a HasherFuture as a return type? I assume that's not how it's supposed to work, which means that I'm suffering from a rather large knowledge gap here.

You can simply call await directly on values of type HasherFuture. E.g. if you wish to return such a future, just don't make your function async and use it as return value directly. You should already be able to type HasherFuture::new(foo, bar).await

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.