Tokio TcpStream + Mpsc


#1

Hi, I am implementing kind of proof-of-concept using the new Tokio-API. The TcpListener shall send a message to a Mpsc-Receiver in case of a connection. The listener is working as expected, but the the Mpsc-Receiver’s for_each task is not triggered.

All I get to see is:

    server running on localhost:6142
    accepted socket; addr=V4(127.0.0.1:32884)
    wrote message; success=true

Any idea whats going wrong?
(full code at https://github.com/frehberg/rust-tokio-playground)

pub fn main() {
        let mut runtime = tokio::runtime::Runtime::new().unwrap();
        let addr = "127.0.0.1:6142".parse().unwrap();
        let (tx, rx) = mpsc::channel(1);
        let listener = TcpListener::bind(&addr).unwrap();
        let server =
            listener.incoming().for_each(move |socket| {
                println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
                let tx = tx.clone();
                tx.send(PortEvent {});
                let port = PortStream(socket);
                let connection =
                    io::write_all(port, "hello world\n")
                        .then(|res| {
                            println!("wrote message; success={:?}", res.is_ok());
                            Ok(())
                        });
                // Spawn a new task that processes the socket:
                tokio::spawn(connection);

                Ok(())
            })
                .map_err(|err| {
                    println!("accept error = {:?}", err);
                });


        let f2 = rx.for_each(|event| {
            println!("Message = {:?}", event);
            Ok(())
        });

        println!("server running on localhost:6142");
        runtime.spawn(server);
        runtime.spawn(f2);

        // Start the Tokio runtime
        runtime.shutdown_on_idle().wait().unwrap();
}

#2
tx.send(PortEvent {});

This returns a future that’s immediately dropped - given this is a bounded channel, I believe the future needs to be driven by the event loop; if it’s dropped prior to being polled, no transmission occurs.


#3

@vitalyd many thanks, using a combinator solved the issue :slight_smile:
The lines in question look similar to this now

        let connection =
            io::write_all(port, "hello world\n")
                .then( | res | {
                    let (port, buf) = res.ok().unwrap();
                    port.tx.clone().send( PortEvent {mesg: "data sent"} )
                })
                .then( |res| {
                    println!("wrote message; success={:?}",res.is_ok());
                    Ok(())
                });

        // Spawn a new task that processes the socket:
        tokio::spawn(connection);