How to select for tcpStream and mpscStream


#1

Hello, i am trying to build stateful tcp server, where each client will be simple task.

Now this client will be listening for 2 types of messages, one, tcp another, mpsc channel.

Now, i want to combine both, like, do select on both the stream and then spwn that combines stream on executor.

but i am not sure how to do that.

 let socketStream = rx.for_each(move|msg|{
             println!("Message Received from socket stream {:?}", msg.Body);
             Ok(())
        }).then(|_|{
            println!("Socket closed....!");
            Ok(())
         });

         let chanStream = rx_chan.for_each(move |msg|{
             println!("Received message from channel {:?}", msg.Body);
             Ok(())
        }).then(|_|{
            println!("Receiver channel got closed.");
            Ok(())
         });

        let streamer = socketStream
        .select(chanStream)
        .then({Ok(())});    

        executor.spawn(streamer); 

This is throwing some error, not sure, how to solve it.


#2

Can you paste the error?


#3
.then({Ok(())});
   |          ^^^^ the trait `std::ops::FnOnce<(std::result::Result<((), futures::future::SelectNext<futures::future::Then<futures::stream::ForEach<futures::stream::SplitStream<tokio_io::codec::Framed<tokio::net::TcpStream, codec::MyCodec>>, [closure@src/client.rs:24:41: 27:10], std::result::Result<(), std::io::Error>>, std::result::Result<(), _>, [closure@src/client.rs:27:17: 30:11]>, futures::future::Then<futures::stream::ForEach<futures::sync::mpsc::Receiver<codec::Message>, [closure@src/client.rs:32:44: 35:10], std::result::Result<(), ()>>, std::result::Result<(), _>, [closure@src/client.rs:35:17: 38:11]>>), (_, futures::future::SelectNext<futures::future::Then<futures::stream::ForEach<futures::stream::SplitStream<tokio_io::codec::Framed<tokio::net::TcpStream, codec::MyCodec>>, [closure@src/client.rs:24:41: 27:10], std::result::Result<(), std::io::Error>>, std::result::Result<(), _>, [closure@src/client.rs:27:17: 30:11]>, futures::future::Then<futures::stream::ForEach<futures::sync::mpsc::Receiver<codec::Message>, [closure@src/client.rs:32:44: 35:10], std::result::Result<(), ()>>, std::result::Result<(), _>, [closure@src/client.rs:35:17: 38:11]>>)>,)>` is not implemented for `std::result::Result<(), _>`

#4

That code should be:

  .then(|_| Ok(()));

#5

I was actually stuck here. now, what type i should give in here.

 .then(|_|Ok(()));
   |                ^ consider giving this closure parameter a type

#6

Try this:

let streamer = socketStream
        .select(chanStream)
        .map_err(|_| ())
        .map(|_| ());

#7

same error.
i think rust is not able to detect type.

But i don’t know which type to give in here.

.map_err(|_| ())
   |                   ^ consider giving this closure parameter a type

#8

Sorry for the back and forth as I’m on mobile.

Looks like it can’t deduce the Error type of the underlying futures representing the streams. Can you try:

let socketStream = rx.for_each(move|msg|{
             println!("Message Received from socket stream {:?}", msg.Body);
             Ok(())
        }).then(|_|{
            println!("Socket closed....!");
            Ok::<_, ()>(())
         });

         let chanStream = rx_chan.for_each(move |msg|{
             println!("Received message from channel {:?}", msg.Body);
             Ok(())
        }).then(|_|{
            println!("Receiver channel got closed.");
            Ok::<_, ()>(())
         });

#9

Got it. It is working. thank you very much. One question though. If one of the streams will return then select will return and both the streams will close down, right?


#10

Essentially, yes.

Select will complete/resolve and you’ll get a tuple back: the item type of the completed future and another future representing the (eventual) completion of the other future. If you don’t hold on to that other future then it’ll get dropped and its work will be canceled.


#11

Ok, do you think the code I have written is the recommended way to handle async clients? I did this because I don’t want to spawn many tasks.

Or i should handle both the streams seperately and spawn two different tasks for listening to both the streams.

by d way, how much overhead tasks put. i mean, like in erlang, every process will have 200bytes overhead, if it is golang then golang will put 2Kb per goroutine. so, i just wanted to know about this.


#12

This depends on whether the tcp stream + mpsc form a logical unit of work or they’re valid independently. If they’re a unit then select is the way to go.

Tasks/futures in tokio are pretty lightweight - they’re basically little state machines encoded into the types. I wouldn’t worry about their overhead in terms of deciding whether to spawn “many” tasks or not. Instead, decide based on the logical aspect I mentioned in the first paragraph.

In general, for tcp, you’ll have a spawned task/future for each tcp connection.