Hello All,
I’m trying to develop an application which tries to read data from a socket and a mpsc channel in parallel. Following is the code snippet.
const SERVER_PATH: &'static str = "/tmp/server";
const CLIENT_PATH: &'static str = "/tmp/client";
pub struct StringDatagramCodec;
impl UnixDatagramCodec for StringDatagramCodec {
type In = String;
type Out = String;
fn decode(&mut self, _src: &std::os::unix::net::SocketAddr,buf: &[u8],) -> io::Result<Self::In> {
let decoded = String::from_utf8_lossy(buf).into_owned();
Ok(decoded)
}
fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<PathBuf> {
buf.extend_from_slice(&msg.into_bytes());
Ok(Path::new(SERVER_PATH).to_path_buf())
}
}
// This is a simple server, implements 'Service' and forwards the received http request URI's on 'tx'.
pub struct Sampleserver<'a> {
pub tx: mpsc::Sender<String>,
}
pub fn create_framed_sock(handle: &tokio_core::reactor::Handle) -> tokio_uds::UnixDatagramFramed<StringDatagramCodec>
{
let sock = UnixDatagram::bind(CLIENT_PATH, &handle).unwrap().framed(StringDatagramCodec);
sock
}
pub fn waitForMsgsFromServers(handle: &tokio_core::reactor::Handle, rx: mpsc::Receiver<String>)
{
let sock = create_framed_sock(handle);
let f = sock.send("START".to_string()).and_then(move |sock| {
let (sink, stream) = sock.split();
// wait for msgs from server which is bound to 'SERVER_PATH'
let mut mapped_stream = stream.map(move |msg| {
let mut ret = msg.to_string();
// <to some processing ith the data>
ret
}).select(rx.map_err(|()| { // wait for msgs from Sampleserver
use std::io::{Error, ErrorKind};
Error::new(ErrorKind::Other, "end of stream")
}));
sink.send_all(mapped_stream)
});
handle.spawn(f.map(|_| ()).map_err(|_| ()));
}
fn main(){
let (tx, rx) = mpsc::channel::<String>(5); // extra buffer
let http_server = {
Http::new().bind(&addr, move || {
Ok(http_hyper::http_hyper::Sampleserver::new(
tx.clone(),
))
})?
};
waitForMsgsFromServers(&handle, rx);
http_server.run()?;
}
Assume the process which creates ‘unix domain socket which is bound to “/tmp/server” path’ is called as ‘Process2’.
The above code works fine and receives data from both ‘SampleServer’ & ‘Process2’.
However, problems occur when ‘Process2’ goes out of scope (‘Process2’ deletes the socket("/tmp/server") during exit). Below are few of the problems:
* ‘waitForMsgsFromServers’ no longer receives messages sent from ‘Sampleserver’.
* There are changes that ‘Process2’ will be re-spawned as soon as it goes out of scope. ‘waitForMsgsFromServers’ should be able to reconnect to the newly created socket("/tmp/server"). In this is case, I would like to call ‘waitForMsgsFromServers’ again. However not sure where to exactly to make this call.( Considering the fact that ‘rx’ cannot be re-used.)
Will it work if errors from ‘sink.send_all’ are captured?
Could anyone help me with the above problems?
Thanks in Advance!!!