Connection issues with client/server communicating over socket

hey guys,

I've have a server that's running on a local socket in a cargo workspace. the client and server are using an rpc defined via tarpc, and are attempting to communicate over a unix domain socket.
this is the server code:

#[tokio::main]
async fn main()  {
    let socket_path=PathBuf::from("tmp/sniper.socket");
    let _ = std::fs::remove_file(socket_path.clone());
    let listener = UnixListener::bind(socket_path).unwrap();
    
    let mut codec_builder=LengthDelimitedCodec::builder();
    
    let sniper=Arc::new(Mutex::new(sniper::Sniper::new()));
    tokio::spawn(async move{
        loop {
            let (stream,_addr)=listener.accept().await.unwrap();
            let framed_stream= codec_builder.new_framed(stream);
            let transport = serde_transport::new(framed_stream,Json::default());
            let server=ConnectionHandler::new(sniper.clone());
            let fut = server::BaseChannel::with_defaults(transport).execute(server.serve());
            tokio::spawn(fut);
        }
    });   
}

this is the client code:

#[tokio::main]
pub async fn main(){
    println!("Hello from sniper client!");
    // Bind a server socket

    let session_id="12345";
    let test_uri="test.py";
    let lang="python";
    let socket_path="/tmp/sniper.socket";
    let mut codec_builder=LengthDelimitedCodec::builder();
    let conn = UnixStream::connect(socket_path).await.unwrap();//<-23:55 where second error happens
    let transport = serde_transport::new(codec_builder.new_framed(conn), Json::default());
    let client=SniperServiceClient::new(Default::default(),transport).spawn();
    
        client.add_target(tarpc::context::current(),session_id.to_string(),test_uri.to_string(),lang.to_string()).await;

        let snippet=client.get_snippet(tarpc::context::current(),lang.to_string(),"if/elif/else".to_string()).await;
        
        println!("{:?}",snippet);
}

I'm setting up the server to listen on the socket with this test script:

#get the project home
proj_home=$(git rev-parse --show-toplevel)
#build the server
(cd $proj_home/sniper && cargo build)
#build the client
(cd $proj_home/sniper-client && cargo build)
bin_path="$proj_home/target/debug"

#set up the server to activate automatically on socket connection
systemd-run --user \
--socket-property=ListenStream=/tmp/sniper.socket \
--socket-property=NoDelay=true \
$bin_path/sniper-server

however the first time I run the client(after running the test script) I get this error:

Hello from sniper client!
Err(Kind(ConnectionReset))

the second time I run the client, I get this error:

Hello from sniper client!
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', sniper-client/src/main.rs:23:55

which is the same error I get when not using a systemd socket at all. line 23 is where the client UnixStream connects to the socket path.

I think the issue in the second case is that the socket is still there but the server is no longer running/listening. I'm not sure what the first error is about.

my understanding of systemd sockets is that they are run the associated service/command when anything connects to the associated socket, and shuts down the program when there are no more active connections, but if that was correct then there would be no difference in the first or second connection.

Your server code should immediately exit because if you return from main, all other tasks are killed. Either remove the outer tokio::spawn or change it to .await the JoinHandle that the tokio::spawn call returns.

that fixed the connection reset issue, though I added some print statements and it times out after the second client requests(get_snippet). the first request took around 5 seconds, which may just be the time it takes the server to initialize. note this was the case with both changes (remove outer spawn or await it's result)

This is my first time writing a(n asynchronous) client/server program, what are some things I should put in place to trace the source of issues and delays(rust or in general)?

also should I make test part of the server or client, given the server is setup to recieve function calls as serialized json request?

Can you post the new server? I would definitely not expect it to take 5 seconds to run — that's very very slow.

You may find this article helpful.

Okay so a bit of an update: the first request takes 10 seconds the first run, however with the second cargo run from the client folder the request is completed almost instantaniously, so this may be just the overhead of systemd initializing the server (I don't have enough experience to know if daemonizing directly would offer any real startup benefit though).

on both runs the second request(returning a snippet) causes a timeout.

this is the only part that changed:

...
    tokio::spawn(async move{
        loop {
            let (stream,_addr)=listener.accept().await.unwrap();
            let framed_stream= codec_builder.new_framed(stream);
            let transport = serde_transport::new(framed_stream,Json::default());
            let sniper_server=ConnectionHandler::new(sniper.clone());
            let fut = server::BaseChannel::with_defaults(transport).execute(sniper_server.serve());
            tokio::spawn(fut).await.unwrap();
        }
    }).await.unwrap();

thanks for the link. I'm reading it now

for future reference, the timeout seems to be related to systemd, whether that's due to misconfiguration on my part or just a current limitation of systemd.

when starting the server directly, and then starting the client separately, the requests are handled fine.

when trying to activate it via systemd-socket, it seems all requests time out waiting for a response

Are you using UnixListener::from_std anywhere? I've found that it started hanging after upgrade to tokio 1.x:

(I responded to that issue)

no, I'm using the UnixListener/UnixStream from tokio::net, the client works fine when starting the server manually, but something about either how I am instantiating the systemd socket or something with systemd( and possibly selinux) is causing all requests to hang/timeout, which might just be because the server isn't starting at all.

I've tried running it directly from the debug binary location and via cargo run. systemd should be path aware in my case (hence cargo should run) because I have a export my environment to the systemd --user env on login.

btw, I currently have a struct wrapped in a tokio::sync::RwLock that contains only 2 shared concurrent data structures (Dashmaps). Is it safe to wrap the containing struct only in an Arc, and if not is there anything I can implement on the struct to avoid locking?

The Dashmap type contains locks internally, so you should not need an RwLock around it. You may find the shared state chapter in the Tokio tutorial useful.

I was mainly wondering about the containing struct

#[derive(Debug)]
pub struct Sniper {

    pub(crate) snippets: DashMap<(String,String),Snippet>,
    pub(crate) snippet_sets: DashMap<(String,String),SnippetSet>,
}

which is owned by this struct

#[derive(Clone)]
pub(crate) struct RequestHandler {
    pub(crate) config: Arc<Mutex<SniperConfig>>,
    pub(crate) targets: Arc<DashMap<(String,String),TargetData>>,
    pub(crate) sniper_lock: Arc<tokio::sync::RwLock<Sniper>>,
}

While Dashmap doesn't require locks, my reasoning was that I wasn't sure accessing containing struct was thread safe, even though it's members are built for concurrent operations.

I was wondering if that was the case, and if so, if there was something on the struct I could change in order to avoid locking access