How to re-connect to a socket and capture errors from 'sink.send_all()'

Hello All,

I'm trying to develop an application which tries to read data from a socket and a mpsc channel in parallel. Following is the code snippet.

const SERVER_PATH: &'static str = "/tmp/server";
const CLIENT_PATH: &'static str = "/tmp/client";

pub struct StringDatagramCodec;
impl UnixDatagramCodec for StringDatagramCodec {
	type In = String;
	type Out = String;
	fn decode(&mut self, _src: &std::os::unix::net::SocketAddr,buf: &[u8],) -> io::Result<Self::In> {
		let decoded = String::from_utf8_lossy(buf).into_owned();
		Ok(decoded)
	}
	fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<PathBuf> {
		buf.extend_from_slice(&msg.into_bytes());
		Ok(Path::new(SERVER_PATH).to_path_buf())
	}
}

// This is a simple server, implements 'Service' and forwards the received http request URI's on 'tx'.
pub struct Sampleserver<'a> {
	pub tx: mpsc::Sender<String>,
}

pub fn create_framed_sock(handle: &tokio_core::reactor::Handle) -> tokio_uds::UnixDatagramFramed<StringDatagramCodec>
{
	let sock = UnixDatagram::bind(CLIENT_PATH, &handle).unwrap().framed(StringDatagramCodec);
	sock
}

pub fn waitForMsgsFromServers(handle: &tokio_core::reactor::Handle, rx: mpsc::Receiver<String>)
{
	let sock = create_framed_sock(handle);
	let f = sock.send("START".to_string()).and_then(move |sock| {
		let (sink, stream) = sock.split();

		// wait for msgs from server which is bound to 'SERVER_PATH'
		let mut mapped_stream = stream.map(move |msg| {
			let mut ret = msg.to_string();
			//  <to some processing ith the data>
			ret
		}).select(rx.map_err(|()| {  // wait for msgs from Sampleserver
			use std::io::{Error, ErrorKind};
			Error::new(ErrorKind::Other, "end of stream")
		}));
		sink.send_all(mapped_stream)
	});
	handle.spawn(f.map(|_| ()).map_err(|_| ()));
}


fn main(){
	let (tx, rx) = mpsc::channel::<String>(5); // extra buffer
	let http_server = {
		Http::new().bind(&addr, move || {
			Ok(http_hyper::http_hyper::Sampleserver::new(
				tx.clone(),
			))
		})?
	};

	waitForMsgsFromServers(&handle, rx);
	http_server.run()?;
}

Assume the process which creates 'unix domain socket which is bound to "/tmp/server" path' is called as 'Process2'.

The above code works fine and receives data from both 'SampleServer' & 'Process2'.

However, problems occur when 'Process2' goes out of scope ('Process2' deletes the socket("/tmp/server") during exit). Below are few of the problems:
* 'waitForMsgsFromServers' no longer receives messages sent from 'Sampleserver'.
* There are changes that 'Process2' will be re-spawned as soon as it goes out of scope. 'waitForMsgsFromServers' should be able to reconnect to the newly created socket("/tmp/server"). In this is case, I would like to call 'waitForMsgsFromServers' again. However not sure where to exactly to make this call.( Considering the fact that 'rx' cannot be re-used.)
Will it work if errors from 'sink.send_all' are captured?

Could anyone help me with the above problems?
Thanks in Advance!!!

You might want to look at futures::future::loop_fn - Rust to wrap your Process2 reading (and re-connecting) code.

Hello @vitalyd, thank you for the suggestion. I did modify the code to include 'loop_fn' as you suggested. Following is the modified code:

pub fn waitForMsgsFromServers(handle: &tokio_core::reactor::Handle, rx: &std::sync::Arc<std::sync::Mutex<futures::sync::mpsc::Receiver<std::string::String>>>) -> Box<Future<Item = (), Error = std::io::Error>> {
{
	let sock = create_framed_sock(handle);
	let recv_obj = *rx.lock().unwrap();
	let f = sock.send("START".to_string()).and_then(move |sock| {
		let (sink, stream) = sock.split();

		// wait for msgs from server which is bound to 'SERVER_PATH'
		let mut mapped_stream = stream.map(move |msg| {
			let mut ret = msg.to_string();
			//  <to some processing ith the data>
			ret
		}).select(recv_obj.map_err(|()| {  // wait for msgs from Sampleserver
			use std::io::{Error, ErrorKind};
			Error::new(ErrorKind::Other, "end of stream")
		}));
		sink.send_all(mapped_stream).map(|_| {
			println!("CLIENT DISCONNECTED");
			()
		}).map_err(|err| err)
	});
	let client = f.map_err(|_| { panic!()});
	Box::new(client)
}


fn main(){
	let (tx, rx) = mpsc::channel::<String>(5); // extra buffer
	let http_server = {
		Http::new().bind(&addr, move || {
			Ok(http_hyper::http_hyper::Sampleserver::new(
				tx.clone(),
			))
		})?
	};

	let handle = http_server.handle();
	let handle_clone = handle.clone();
	let rx_clone = Arc::new(Mutex::new(rx));

	let infinite = loop_fn((), move|_| {
		waitForMsgsFromServers(&handle_clone, &rx_clone.clone()).map(|_| -> Loop<(), ()> {
			Loop::Continue(())
		})
	});
	let t = Box::new(infinite);
	handle.spawn(t.map(|_| ()).map_err(|_| ()));

	http_server.run()?;
}

However, I am stuck with the following error:

Have used "Arc::new(Mutex()" to clone the 'rx' value, as this value cannot be 'moved' inside a loop.

Could you please provide your views on this issue.

let rx_clone = Arc::new(Mutex::new(rx));

	let infinite = loop_fn((), move|_| {
		waitForMsgsFromServers(&handle_clone, &rx_clone.clone()).map(|_| -> Loop<(), ()> {
			Loop::Continue(())
		})
	});

I think you want something like this instead:

let rx_arc = Arc::new(Mutex::new(rx));
let rx_clone = rx_arc.clone();

	let infinite = loop_fn((), move|_| {
		waitForMsgsFromServers(&handle_clone, &rx_clone).map(|_| -> Loop<(), ()> {
			Loop::Continue(())
		})
	});

Hello @vitalyd,
Have tried the above code you suggested. But this created the following error.

Could you please help with this?

waitForMsgsFromServer should take rx: Arc<...>, not &rx: Arc<...>, as an argument. Adjust the call to it in the main function to pass rx_clone instead of &rx_clone.

Hello @vitalyd,
I did try to pass 'rx_clone' without reference. In that case,
The following error is thrown from main:(where loop_fn is present)

'waitForMsgsFromServers' throws the following error:
error!

Ok, yeah - compiler is right :slight_smile: Any way you can put something in the playground? Might be easier to iterate.

At any rate, since loop_fn is, well, potentially looping, each iteration of it needs a fresh Arc clone. So you can pass a clone of the Arc as the initial_state arg of the loop_fn and that gets you the initial Arc. When you return a Continue value from the loop_fn, you should either return another Arc clone in it or move the Arc instance you had into it. That will allow each iteration to have its own Arc and should get you past that error.

The second issue is you’re getting a MutexGuard out of the Mutex, which is a scoped handle that gives you access to the data protected by the Mutex. But, the mutex guard has a reference back to the mutex, which means it has a lifetime associated with it and the closures you’re creating must not have any (ie the 'static requirement). So, you need to access the lock protected data via the Arc and Mutex but keep the access scoped such that you don’t "forward" the mutex guard to other closures that have a different lifetime from the block of code you’re executing. Put another way, the Arc keeps the mutex alive across the various closures/futures - this satisfies the 'static requirement. Then, when you need access to the data, go through the Arc and Mutex but keep that access confined to the closure; if you need to access the data again in a different closure (eg continuation), you need to do it again via the Arc and Mutex, rather than trying to pass the MutexGuard. Hope that makes sense.

@vitalyd, Thank you for the response.

Below are the results of the few experiments I did with the code at playground.
The code in the playground doesn't compile due to unavailability of 'tokio_uds' in the build environment.

I tried to dereference the Arc Mutex by calling 'rx.lock().unwrap().deref()' as you suggested (MutexGuard).
But it throws the following error.

Tried passing Arc instance to the 'loop_fn' and made sure continue uses the same instance to loop again.

Got the following error when 'rx_clone' is passed to 'loop_fn':

Got the following error when the reference of 'rx_clone' is passed to 'loop_fn':

I am kind of stuck with this problem. Could you please tell me where I'm going wrong.
Or is there any other mechanism other than mpsc channel, which could be used to pass information from Server to 'waitForMsgsFromServers'?

Maybe something like this:

fn run() -> Result<()> {
    let addr = "127.0.0.1:5000".parse()?;
    let (tx, rx) = mpsc::channel::<String>(5);
    let rx_arc = Arc::new(Mutex::new(rx));
    let rx_clone = rx_arc.clone();
    let http_server = { Http::new().bind(&addr, move || Ok(SampleServer::new(tx.clone())))? };
    let handle = http_server.handle();
    let test = handle.clone();
    let infinite = loop_fn(rx_clone, move |rx| {
        waitForMsgsFromServers(&test, rx.clone()).map(|_| -> Loop<(), Arc<Mutex<mpsc::Receiver<String>>>> {
            println!("Continue:: loop");
            Loop::Continue(rx)
        })
    });
    handle.spawn(infinite.map(|_| ()).map_err(|_| ()));
    http_server.run()?;
    Ok(())
}

Note that loop_fn receives the rx_clone in its initial_state argument. That bootstraps the looping. For waitForMsgsFromServers:

pub fn waitForMsgsFromServers(
    handle: &tokio_core::reactor::Handle,
    rx: Arc<Mutex<mpsc::Receiver<std::string::String>>>, ) -> Box<Future<Item=(), Error=std::io::Error>> {
    let sock = create_uds_sock(&handle);
    let f = sock.send("START".to_string()).and_then(move |sock| {
        let (sink, stream) = sock.split();
        let mapped_stream = stream
            .map(move |msg| {
                let mut ret = "".to_string();
                println!("Msg from wpa:: {}", msg);
                ret
            });
        let poll_rx =
            futures::stream::poll_fn(move || rx.lock().unwrap().poll())
                            .map_err(|_| {
                                use std::io::{Error, ErrorKind};
                                Error::new(ErrorKind::Other, "Select stream error")
                            });

        sink.send_all(mapped_stream.select(poll_rx)).map(|_| println!("CLIENT DISCONNECTED")).map_err(|err| err)
    });
    let client = f.map_err(|_| panic!());
    Box::new(client)
}

I added the poll_rx stream which will internally acquire the mutex and poll the receiver. We move the Arc into it so that it owns it, and this allows it to poll it repeatedly (nothing is borrowed, and the MutexGuard is not being "leaked" anywhere, which is what the compiler was complaining about before).

All that said, I didn't actually try running the above - it does compile though :slight_smile:.

1 Like

Hello, thanks a lot for the code.

I have done the following modifications to the code before running.
In waitForMsgsFromServers:

let client = f.map_err(|_| panic!());

is replaced by

let client = f.map_err(|err| {
println!("Connection err {}", err);
err});

And waitForMsgsFromServers should be called when there is an error in sending data to Process2. So modified the loop_fn logic as follows:

    let infinite = loop_fn(rx_clone, move |rx| {
             waitForMsgsFromServers(&test, rx.clone()).map(|_| {
                 Loop::Break(())
             }).map_err(|_| -> Loop<(), Arc<Mutex<mpsc::Receiver<String>>>> {
           	println!("Error in connection, continuing");
           	Loop::Continue(rx)
           })
         });

This code captures the error when sending data to Process2 failed. And

Error in connection, continuing

log is displayed. But the problem here is, it never tries to loop again.
Is there something wrong in calling 'Loop::Continue' inside 'map_err' case?

Nothing jumps out at me. Can you add more println's to trace the flow and see what’s happening? For example, you do return Loop::Break if waitForMsgsFromServers completes successfully - maybe one of those futures resolves without error? Or do you know for a fact that the loop isn’t restarted?

Actually the loop hasn't restarted when Loop::Continue is called inside 'map_err'.
But using 'or_else' instead of 'map_err' seems to solve the problem.
Thank you so much for all help!!!

Below is the working code:

let infinite = loop_fn(rx_clone, move |rx| {
    waitForMsgsFromServers(&test, rx.clone()).map(|_| {
        Loop::Break(())
    }).or_else(|err| -> Result<Loop<(), Arc<Mutex<mpsc::Receiver<String>>>>> {
        println!("Error in connection, continuing");
    	Ok(Loop::Continue(rx))
    })
});

I'm still wondering the reason for not restarting the loop in case of 'map_err' case.

Ah, of course - silly us. map_err transforms the error of the underlying future and returns the value as the error. That makes the LoopFn future resolve with an error (the error value is the Continue but that doesn’t matter). We want the underlying future to return Continue as the value, not error, even if underlying future errors (ie swallow the underlying error, so to speak) - or_else does that.

1 Like