Code review request: unilateral IPC on Unix, Application uniqueness

This program

  1. Detects application uniqueness, if the application is a unique/primary instance, it launches a server, otherwise a client over a Unix domain socket.

  2. Client will send a message that will trigger the server to invoke a time-heavy task.

  3. When the task is running, the server will ignore all the incoming messages. This is currently worked-around by making the client send the current time to the server. Server keeps track of the heavy task and when it finishes. I'd like a way to actually drop the socket before the task starts but I can't find a way to re-activate the socket.

///// Begin imports
use {
    nix::{
        errno::Errno::EADDRINUSE,
        sys::socket::{
            bind, connect, recv, send, socket, AddressFamily, MsgFlags, SockAddr, SockFlag,
            SockType, UnixAddr,
        },
        unistd::close,
        Error::Sys,
    }, // "0.15.0"
    std::{error::Error, os::unix::io::RawFd},
};
///// End imports

///// Begin constsnts
static SOCK_ADDR: &'static str = "com.localserver.myapp.sock";
//// End constants

//// Begin macros
macro_rules! Expected {
    () => {
        Result<(), Box<dyn Error>>
    };
    ($t:ty) => {
        Result<$t, Box<dyn Error>>
    }
}
//// End Macros

//// Begin helper functions
fn heavy_task() -> Expected!() {
    println!("Performing heavy task");
    std::thread::sleep(std::time::Duration::from_secs(5));
    println!("Task finished.");
    Ok(())
}
/// Return the current timestamp in nanosecond precision
fn get_time() -> Expected!(u128) {
    Ok(std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)?
        .as_nanos())
}
//// End helper functions

fn server(msgsock: RawFd) -> Expected!() {
    // listen(sock, MESSAGE_QUEUE_SIZE)?;
    let mut earliest_non_busy_time: u128 = get_time()?;

    loop {
        println!("Listening...");
        loop {
            let mut buf = [0u8; std::mem::size_of::<u128>()];
            let rval = recv(msgsock, &mut buf[..], MsgFlags::empty())?;
            if rval != std::mem::size_of::<u128>() {
                break;
            } else {
                let incoming_time = u128::from_ne_bytes(buf);
                if incoming_time > earliest_non_busy_time {
                    heavy_task()?;
                    earliest_non_busy_time = get_time()?;
                } else {
                    println!("Task request rejected since the task had already been running");
                }
            }
        }
    }
    // Ok(()) // unreachable
}

fn client(sock: RawFd, addr: &SockAddr) -> Expected!() {
    match connect(sock, addr) {
        Ok(_) => {
            let current_time: u128 = get_time()?;
            let msg = current_time.to_ne_bytes();
            if send(sock, &msg, MsgFlags::empty())? != std::mem::size_of::<u128>() {
                close(sock)?;
                panic!("Message could not be sent");
            }
            close(sock)?;
            Ok(())
        }
        Err(e) => {
            close(sock)?;
            panic!("Error connecting to socket: {}", e);
        }
    }
}

fn main() -> Expected!() {
    let sock: RawFd = socket(
        AddressFamily::Unix,
        SockType::Datagram,
        SockFlag::empty(),
        None, // Protocol
    )?;

    let addr = SockAddr::Unix(UnixAddr::new_abstract(SOCK_ADDR.as_bytes())?);

    match bind(sock, &addr) {
        Err(e) => match e {
            Sys(EADDRINUSE) => {
                println!("Secondary instance detected, launching client.");
                match client(sock, &addr) {
                    Ok(_) => println!("Message sent."),
                    Err(_) => println!("Message sending failed."),
                }
                println!("Exiting...");
            }
            _ => {
                panic!("Socket binding failed due to: {:?}", e);
            }
        },
        Ok(_) => {
            println!("Primary instance detected, launching server.");
            server(sock)?;
        }
    }
    Ok(())
}

See IPC on POSIX ($1893594) · Snippets · Snippets · GitLab if you want a continuous view

Further explained here: Detecting Application Uniqueness and interprocess communication using Unix Domain Socket ($1903637) · Snippets · Snippets · GitLab

While I admit your Expected macro is clever, the idiomatic solution to this is a type alias:

type Result<T> = std::result::Result<T, Box<Dyn Error>>

fn get_time() -> Result<u64> { /* ... */ }

I'm not familiar with the functions you use to create your socket, but it seems like it'd be really nice to put the close in drop implementation for some type, to make it impossible to forget.

Please note that write does not guarantee that it wrote all the bytes you gave it! It might only write some of them, and you can see how many by inspecting the returned usize. Typically one would have a loop that writes until it has all been written.

What do you mean with re-activating the socket? If it's closed, it's closed, and that's it. You'd have to establish a new connection.

1 Like

Thank you for pointing that out. I'm guessing the same applies to the read(2) function in the server. Now I'm thinking of ways to determine whether the server read it completely or not (should I count the digits and compare with current time? etc). I've changed to code on the client side:

fn server(sock: RawFd) -> Expected!() {
    listen(sock, MESSAGE_QUEUE_SIZE)?; 
    let mut earliest_non_busy_time = get_time()?;

    loop {
        println!("Listening...");
        match accept(sock) {
            Ok(msgsock) => loop {
                let mut buf = [0u8; MESSAGE_CAPACITY];
                let rval = read(msgsock, &mut buf[..])?;
                if rval == 0 {
                    break;
                } else {
                    // trim-off the trailing null bytes, parse the buffer as a u64
                    let incoming_time = std::str::from_utf8(&buf)?
                        .trim_end_matches('\0')
                        .parse::<u64>()?;

                    if incoming_time > earliest_non_busy_time {
                        heavy_task()?;
                        earliest_non_busy_time = get_time()?;
                    } else {
                        println!("Task request rejected since the task had already been running");
                    }
                }
            },
            Err(e) => panic!("Error accepting socket {:?}", e),
        }
    }
    // Ok(()) // unreachable
}

Expected macro vs type alias

The only reason I prefer the macro is because I can "utilize" rust's module import system -- since the macros always sit on top of the crate I can just use crate::Expected!() instead of crate::types::Result<()> which requires more typing. I often utilize macros to declare modules this way:

macros.rs

#[macro_export]
macro_rules! mods {
    ($($id:ident),+) => {
        $(mod $id;)+
    };
}

#[macro_export]
macro_rules! pub_mods {
    ($($id:ident),+) => {
        $(pub mod $id;)+
    };
}

Everywhere else:

mods!(aux, app, ui, consts);
1 Like

Your usage of macros is definitely interesting... Note that you can put the result type in the top-level by using a pub use.

As for reading, you typically need to know how many bytes you want in order to correctly read them. When sending strings, a typical solution is to send along the length just before the string. Why not use from_be_bytes and to_be_bytes to send your timestamp as eight bytes instead of an unknown number of ascii characters?

That is indeed a very good point, thank you. I also used datagram instead of streams too. Datagrams over domain sockets are always reliable as unix(7) - Linux manual page says.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.