Parallel ftp connections or too many mutexes/locks?


#1

Hi!
I just started learning Rust and coming from the scripting/runtime world I am still getting used to the new concepts Rust provides.

My question is quite specific, though, about threads, mutexes and locking:
To get more acquainted to the language, I am trying to write a small proof-of-concept parallel ftp uploader, utilizing a thread pool and a connection pool.
I have managed to appease the compiler to a point where I can run the program and get connections to the server. I am just not sure the way I am locking the resources really results in work being done in parallel or whether I have just created an overly-complicated sequential procedure using threads instead.

I hope someone could tell me if I’m using the mutex locks correctly and give me feedback on what I could improve here:

extern crate threadpool;
extern crate ftp;

use threadpool::ThreadPool;
use ftp::FTPStream;

use std::sync::{Mutex, mpsc};
use std::sync::Arc;

fn main() {
  let connections = 4;
  let jobs = 8;

  let thread_pool = ThreadPool::new(connections);
  let (tx, rx) = mpsc::channel();

  // prepare connection pool
  let mut conn_pool = Vec::with_capacity(connections);
  for _ in 0..connections {
    let mut stream = FTPStream::connect("127.0.0.1".to_string(), 21).unwrap();
    stream.login("anonymous", "anonymous").unwrap();

    conn_pool.push(Mutex::new(stream));
  }

  let protected_conn_pool = Arc::new(Mutex::new(conn_pool));
  for i in 0..jobs {
    println!("starting thread {}", i);

    let tx = tx.clone();
    let protected_conn_pool = protected_conn_pool.clone(); // Arc++

    thread_pool.execute(move|| {
      let conn_pool = protected_conn_pool.lock().unwrap();
      let mut stream = None;

      // find a connection that is not in use
      conn_pool.iter().find(|&conn| {
        match conn.try_lock() {
          Ok(s) => { stream = Some(s); true },
          Err(_) => false
        }
      });

      // do something productive with the connection...
      match stream {
        Some(mut s) => {
          println!("DIR: {}", s.current_dir().unwrap());
        },
        None => { }
      }

      println!("finished thread {}", i);
      tx.send(()).unwrap();
    });
  }

  // wait for the all jobs to finish
  rx.iter().take(jobs).collect::<Vec<_>>();
}

#2

Well… yes, that’s an overcomplicated sequential program. But you’re almost there! You just don’t need to lock the conn_pool as you don’t need to mutate it (you only need to mutate its elements but they’re guarded by their own mutexs). If you get rid of that lock, it should be parallel.

Also, consider using a channel for connections instead of a vector with locks. That is, have the jobs take the connections off the channel as needed and put them back on when done.


#3

FYI, this is how I would do it (using Strings instead of ftp connections because I don’t have an FTP server to test on):

extern crate threadpool;

use threadpool::ThreadPool;

use std::sync::{Mutex, mpsc};
use std::sync::Arc;

fn main() {
    let connections = 4;
    let jobs = 8;

    let thread_pool = ThreadPool::new(connections);
    let (result_tx, result_rx) = mpsc::channel();

    // prepare connection pool
    let (conn_tx, conn_rx) = mpsc::sync_channel(connections);
    for i in 0..connections {
        let stream = format!("Connection {}", i);

        // If this blocks, there's a bug in rust.
        conn_tx.try_send(stream).unwrap();
    }

    let protected_conn_rx = Arc::new(Mutex::new(conn_rx));

    for i in 0..jobs {
        println!("starting job {}", i);

        let result_tx = result_tx.clone();

        let conn_tx = conn_tx.clone();
        let protected_conn_rx = protected_conn_rx.clone();

        thread_pool.execute(move || {
            // Take connection. Unfortunately, rust doesn't have a multi-sender,
            // multi-receiver channel so we have to lock the channel.
            let conn = protected_conn_rx.lock().unwrap().recv().unwrap();

            // Do something.
            println!("job executing: {}", conn);

            // Give back connection. This shouldn't be able to block so assert that fact.
            conn_tx.try_send(conn).unwrap();

            println!("finished job {}", i);
            result_tx.send(format!("job {} result", i)).unwrap();
        });
    }

    // Drop this we don't count as a sender. This ensures that the following
    // loop will finish.
    drop(result_tx);

    for result in result_rx {
        println!("result: {}", result)
    }
}

#4

Thank you! (I was actually googling for bidirectional channels for Rust just before you posted the example…)
that also looks like a much nicer solution than trying to harass a vector for available connections.
much appreciated!