Looking for advice on IPC performance using Unix Domain Sockets

I'm implementing a Postfix transport, and I was looking for some advice to milk some more performance out of it. I have a client and a daemon. Postfix executes the client for every single incoming email that matches a transport rule. The client reads the email from stdin, and forwards it over a UDS where my daemon is listening. This can happen concurrently,, with an upper limit of about 1000 clients at one time. I've included a simplified version of my program below:

Client

use std::os::unix::net::UnixStream;
use std::io::prelude::*;
use std::io::stdin;
use std::path::Path;
use std::env;
use std::process::exit;

fn main() {
    let mut args = env::args().skip(1);
    let socket_file_path = args.next().expect(
        "Could not parse socket file path from command args",
    );
    let mut email_body: Vec<u8> = Vec::with_capacity(1024);
    stdin().read_to_end(&mut email_body).expect(
        "Could not read email from stdin",
    );
    match UnixStream::connect(Path::new(&socket_file_path)) {
        Ok(mut stream) => {
            if let Err(e) = stream.write_all(email_body.as_ref()) {
                eprintln!("Writing to socket failed due to {}", e);
                exit(1);
            }
        }
        Err(e) => {
            eprintln!("Connecting to socket failed due to {}", e);
            exit(1);
        }
    }
}

Daemon

use std::env;
use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;
use std::process::exit;
use std::thread;


fn handle_stream(mut stream: UnixStream) {
    let mut buf = Vec::with_capacity(1024);
    stream.read_to_end(&mut buf).expect(
        "Could not read to end",
    );
}


pub fn main() {
    let mut args = env::args().skip(1);
    let socket_file_path = args.next().expect(
        "Could not parse socket file path from command args",
    );
    match UnixListener::bind(Path::new(&socket_file_path)) {
        Err(e) => {
            eprintln!("Could not bind listener due to {}", e);
            exit(1);
        }
        Ok(listener) => {
            for conn in listener.incoming() {
                match conn {
                    Err(e) => {
                        eprintln!("Could not accept connection due to {}", e);
                        exit(1)
                    }
                    Ok(stream) => {
                        thread::spawn(|| handle_stream(stream));
                    }
                }
            }
        }
    }
}

Timing

This is how I'm testing it:

test.sh

#!/bin/bash
for ((i=0;i<100000;i++))
do
  echo "this is an email" | ./client /tmp/socket.s &
done
wait

Which I run like below:

./daemon /tmp/socket.s &
time ./test.sh

It takes a little over around a minute on my mac and a little under a minute on my work computer:

# Mac
real	1m19.161s
user	3m45.578s
sys	3m6.898s

# Work CentOS
real	0m58.010s
user	3m27.182s
sys	2m57.073s

I've tried a couple of different approaches to make it more efficient, such as using a threadpool, which haven't made any noticeable differences. I tried to use tokio-uds as well:

extern crate futures;
extern crate num_cpus;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_uds;

use std::env;
use std::path::{Path, PathBuf};

use futures::Future;
use futures::stream::Stream;
use tokio_io::io::read_to_end;
use tokio_uds::UnixListener;
use tokio_core::reactor::Core;

fn main() {
    let addr = env::args().nth(1).expect("Could not parse socket path address");
    let path = PathBuf::from(addr);
    read(&path);
}

fn read(addr: &Path) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let listener = UnixListener::bind(addr, &handle).unwrap();

    let server = listener.incoming().for_each(move |(socket, _)| {
        let buf = Vec::new();
        let msg = read_to_end(socket, buf).map(move |(_, _)| {
            ()
        }).map_err(move |e| panic!(e));
        handle.spawn(msg);
        Ok(())
    });

    core.run(server).unwrap();
}

But the results weren't very promising:

Mac
real	1m58.518s
user	5m42.730s
sys	4m36.260s

Work CentOS
real	0m59.991s
user	3m46.163s
sys	2m54.927s

I feel like sending a single line over 100,000 sockets shouldn't take a full minute; is there anything I'm missing?

Before getting into the nitty gritty, this is a release build of the client and daemon? There's substantial system time, so it's not going to be purely this either way. But, it's a good sanity check before talking performance :slight_smile:.

1 Like

It's a release build :). Always a good sanity check.

Ok, thanks for confirming.

Just curious - will the real setup involve spawning a new client process per email? Is that why the benchmark is set up like that? I'd have expected a single client to send 100k messages in a loop, rather than 100k client process instances. A good chunk of the time is going to go into process launching and the setup code you have there. And you're going to end up flooding the system with runnables. You end up with 100k client procs created and another 100k threads for each client handled. But maybe this is intentional, so ...

A couple of things jump out:

  1. Is there a reason the daemon offloads the processing to a new thread? I understand that you tried to use a threadpool without any impact (a bit surprising actually), but I'm curious why do that at all. In the real application I suppose you'll do some non-trivial processing and offloading make sense. But for the benchmark here, it would be interesting to see results without doing that.

  2. The fresh Vec allocation in each handle_stream is less than stellar. But this shouldn't be a significant contributor - just something to perhaps consider in your real program.

My hunch is something's up with your test harness, probably all the fresh client setup. It would be worth timing a single client sending 100k messages in a loop. The very high sys time is an indication that you're in the kernel way too much.

1 Like

Using a client with optional mutli-threading:

use std::os::unix::net::UnixStream;
use std::io::prelude::*;
use std::io::{ErrorKind, stdin};
use std::path::Path;
use std::env;
use std::process::exit;
use std::thread;

fn main() {
    let mut args = env::args().skip(1);
    let socket_file_path = args.next().expect(
        "Could not parse socket file path from command args",
    );
    let n = args.next()
        .expect("Could not parse n")
        .parse::<usize>()
        .unwrap();
    let t = args.next()
        .expect("Could not parse t")
        .parse::<usize>()
        .unwrap();
    let mut email_body: Vec<u8> = Vec::with_capacity(1024);
    stdin().read_to_end(&mut email_body).expect(
        "Could not read email from stdin",
    );
    let mut threads = Vec::new();
    for _ in 0..t {
        let email_body = email_body.clone();
        let socket_file_path = socket_file_path.clone();
        threads.push(thread::spawn(move || for _ in 0..n / t {
            loop {
                let connected = UnixStream::connect(Path::new(&socket_file_path));
                match connected {
                    Ok(mut stream) => {
                        if let Err(e) = stream.write_all(email_body.as_ref()) {
                            eprintln!("Writing to socket failed due to {}", e);
                            exit(1);
                        } else {
                            stream.shutdown(std::net::Shutdown::Write).expect(
                                "bad shutdown",
                            );
                            break;
                        }
                    }
                    Err(e) => {
                        if e.kind() == ErrorKind::Interrupted {
                            continue;
                        } else if e.kind() == ErrorKind::ConnectionRefused {
                            thread::sleep(std::time::Duration::new(0, 1000));
                            continue;
                        } else {
                            eprintln!("Connecting to socket failed due to {}", e);
                            exit(1);
                        }
                    }
                }
            }
        }));
    }
    for t in threads {
        println!("{:?}", t.join());
    }
    println!("Done!");
}

And the Tokio daemon:

extern crate futures;
extern crate num_cpus;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_uds;

use std::env;
use std::path::{Path, PathBuf};

use futures::Future;
use futures::stream::Stream;
use tokio_io::io::{read_to_end, shutdown};
use tokio_uds::UnixListener;
use tokio_core::reactor::Core;

fn main() {
    let addr = env::args().nth(1).expect(
        "Could not parse socket path address",
    );
    let path = PathBuf::from(addr);
    read(&path);
}

fn read(addr: &Path) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let listener = UnixListener::bind(addr, &handle).unwrap();

    let server = listener.incoming().for_each(move |(socket, _)| {
        let buf = Vec::new();
        let msg = read_to_end(socket, buf)
            .map(move |(socket, _)| { shutdown(socket); })
            .map_err(move |e| eprintln!("Error! {}", e));
        handle.spawn(msg);
        Ok(())
    });

    core.run(server).unwrap();
}

I get the following stats with 100,000 messages:

Mac

$ ./daemon /tmp/socket.s &
$ time echo foo | ./client /tmp/socket.s 100000 1
Ok(())
Done!

real	0m3.435s
user	0m0.103s
sys	0m3.329s

$ time echo foo | ./client /tmp/socket.s 100000 2
Ok(())
Ok(())
Done!

real	0m1.374s
user	0m0.183s
sys	0m2.358s

$ time echo foo | ./client /tmp/socket.s 100000 3
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error { repr: Os { code: 24, message: "Too many open files" } }', src/libcore/result.rs:860
note: Run with `RUST_BACKTRACE=1` for a backtrace.

Work CentOS

$ time echo "foo" | ./client /tmp/socket.s 100000 1
Ok(())
Done!

real	0m0.404s
user	0m0.031s
sys	0m0.251s

$ time echo "foo" | ./client /tmp/socket.s 100000 2
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error { repr: Os { code: 24, message: "Too many open files" } }', /checkout/src/libcore/result.rs:860
note: Run with `RUST_BACKTRACE=1` for a backtrace.

So creating the sockets in a loop is definitely faster (and I imagine just sending messages over one socket would be faster still). Also don't know why the daemon errors out in the multi-threaded case; maybe it just can't keep up with the volume? Is the best solution in this case to just restart the Core loop?

I was under the impression that Postfix requires spawning a process per email, but after digging deeper into the documentation it seems like I might be able to have Postfix forward email to a local address instead. If so, I can definitely achieve some better performance there. If that's the case, then the only thing I need to worry about is hardening up my daemon. Thanks for the rubber ducking!

1 Like

Did you try with the backtrace? I think it's your client that likely runs out file descriptors since it's creating a new connection on each loop iteration.

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error { repr: Os { code: 24, message: "Too many open files" } }', src/libcore/result.rs:860
stack backtrace:
   0: std::sys::imp::backtrace::tracing::imp::unwind_backtrace
   1: std::panicking::default_hook::{{closure}}
   2: std::panicking::default_hook
   3: std::panicking::rust_panic_with_hook
   4: std::panicking::begin_panic
   5: std::panicking::begin_panic_fmt
   6: rust_begin_unwind
   7: core::panicking::panic_fmt
   8: core::result::unwrap_failed
   9: daemon::main
  10: __rust_maybe_catch_panic
  11: std::rt::lang_start

It's when I call unwrap on core.run(server). Maybe I'm improperly closing down the sockets? Ideally, there should only ever be T clients at a time, where T is the number of threads.

Oh indeed, I glossed over the unwrap call on it :frowning:

Is there a reason you're calling shutdown in your daemon that returns a future but the future is just dropped inside the closure? Any reason to not call the regular shutdown? If the event loop is busy servicing other events, it may not get around to shutting down for a while? I'm not sure but perhaps that's keeping the socket alive?