Why mpsc channel receiver stream is closing as soon as created


#1

Hello, i am trying to open up receiver stream of mpsc channel ,

let (tx_chan, rx_chan) = mpsc::channel::<Message>(1);
  let chanStream = rx_chan.for_each(move |msg|{
             println!("Received message from channel {:?}", msg.Body);
             //process the socket. 
             //then bind this to sink at the end.
             //We need to create pipe line.
             Ok(())
        }).then(|_|{
            println!("Receiver channel got closed.");
            Ok::<_, ()>(())
         });

    
         executor.clone().spawn(socketStream);

This is getting closed as soon as created.

Any reason for that ?


#2

My first guess is it errors immediately, can look at it later.


#3

i tried putting map_err in between, not receiving any errors.


#4
  1. What’s keeping the tx part alive?
  2. Where are you spawning the chanStream?

#5
use tokio::net::TcpStream;
use tokio::runtime::TaskExecutor;
use codec::MyCodec;
use codec::Message;
use tokio::prelude::*;
use futures::sync::mpsc::{Sender, Receiver};
use futures::sync::mpsc;
use futures::stream::Stream;
use std;
use state::State;
use state::StateMessage;

pub struct Client{
    stateSender: Sender<StateMessage>,
    my_tx: Sender<Message>,
    my_rx: Receiver<Message>,
}

impl Client{
    pub fn new(stateSender: Sender<StateMessage>) -> Self {
        let (tx_chan, rx_chan) = mpsc::channel::<Message>(1);
        Client{
            stateSender: stateSender,
            my_tx: tx_chan,
            my_rx: rx_chan,
        }
    }

    pub fn run(self, socket: TcpStream, executor: TaskExecutor) ->(){

        let framed_socket = socket.framed(MyCodec::new());
        let (tx, rx) = framed_socket.split(); 
        let stateClone = self.stateSender;
        let tx_chan_clone = self.my_tx.clone();

        let message = StateMessage{
            name: "AJ".to_string(),
            chan: Some(tx_chan_clone),
        };

        //Register client with state manager.
        executor.clone().spawn(stateClone.clone().send(message).then(|_|{
            println!("State Registered for client.");
            Ok(())
        }));


         let ec = executor.clone();
         let socketStream = rx.for_each(move |msg|{
             println!("Message Received from socket stream {:?}", msg.Body);
             Ok(())
        }).then(move |_|{
                //DeRegister client with state manager.
                let message = StateMessage{
                    name: "AJ".to_string(),
                    chan: None,
                };
                ec.spawn(stateClone.clone().send(message).then(|_|{
                    println!("State Deregistered for client.");
                    Ok(())
                }));

            println!("Socket closed....!");
            Ok::<_, ()>(())
        });

         let rx_chan = self.my_rx;
         let chanStream = rx_chan.for_each(move |msg|{
             println!("Received message from channel {:?}", msg.Body);
             //process the socket. 
             //then bind this to sink at the end.
             //We need to create pipe line.
             Ok(())
        }).map_err(|err|{
            println!("Error occured...{:?}", err);
            ()
        }).then(|_|{
            println!("Receiver channel got closed.");
            Ok::<_, ()>(())
         });

         executor.clone().spawn(chanStream);
         executor.clone().spawn(socketStream);

        // let streamer = socketStream
        // .select(chanStream)
        // .map(move |_|{()})
        // .then(|_|{ 
        //     println!("Actor closed.");
        //     Ok::<_,()>(())
        // });

//        executor.clone().spawn(streamer); 
        ()
    }
}

#6

As @vitalyd alluded to you need to keep the mpsc::Sender alive, an mpsc::Receiver is closed when all Sender's are dropped. You appear to be sending a clone of the mpsc::Sender out via stateSender, is that being stored somewhere when received or could it be dropped?


#7

it is being stored.

use futures::Stream;
use futures::sync::mpsc::{Receiver, Sender};
use futures::sync::mpsc;
use tokio::runtime::TaskExecutor;
use tokio::prelude::*;
use codec::Message;
use std::collections::HashMap;

pub struct StateMessage{
  pub name: String,
  pub chan: Option<Sender<Message>>,
}

pub struct State{
    count: i64,
    chan: HashMap<String, Sender<Message>>,
    receiver: Receiver<StateMessage>,
}

impl State {
    pub fn new(executor: TaskExecutor) -> (Self, Sender<StateMessage>){
        let (sender, receiver) = mpsc::channel::<StateMessage>(1); 
        let state = State{
            count: 0,
            chan: HashMap::new(),
            receiver: receiver,
        };
        // let rx = receiver.for_each(|msg|{

        //     Ok(())  
        // }).then(|_|{
        //     println!("State Received stopped.");
        //     Ok(())
        // });
        // executor.spawn(rx);
        (state, sender)
    }
}

impl Future for State { 
    type Item = StateMessage;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error>{
        match self.receiver.poll() {
             Ok(Async::Ready(Some(msg))) => {
                 println!("Some message received...!!!");
                 self.chan.insert(msg.name, msg.chan.unwrap());
                 Ok(Async::NotReady)
             },
             Ok(Async::Ready(None)) => {
                 println!("NONE" );
                 Ok(Async::NotReady)
             },
             Ok(Async::NotReady) => Ok(Async::NotReady),
             Err(err) => {
                 println!("Error while pooling");
                 Err(())
             },
        }
    } 
}

#8

That poll() implementation doesn’t look correct. If you poll the receiver and get a Async::Ready(Some), you should not return Async::NotReady. If you do, the future will not be polled again. You need to make sure the underlying future (or stream), which is the receiver here, registers itself with the reactor to be notified when it has more data to process. It’ll only do that if it returns NotReady - that’s the general contract of polling futures.

The easiest way to do that is to loop until you get a NotReady, and then return NotReady from State.

You should also consider Ready(None) to mean the underlying stream is done, and your future should resolve at this point; you’re returning NotReady, which isn’t right.

I don’t think this explains the premature termination you’re seeing but it’s an issue nonetheless. If the stream is ending too early, then the sender is being dropped somewhere (or the stream ends with an error). It’s a bit hard to pinpoint via the snippets you’ve posted since the whole picture has to be seen, but that’s something you should investigate.


#9

Thank you very much for your insights. Got to learn a lot.

1 question. In above scenario, i think stream would make more sense than future right ?

In general, when I am expecting infinite receives, i should be using streams. ?


#10

Use a Stream where you want to produce elements as they become available. A Stream is conceptually the same as an Iterator. A Future is a “one shot” concept - it produces a value or error, and then it’s done. So which to pick really depends on what you’re trying to achieve semantically.

For the State struct, if you want to add a channel to the internal HashMap but then yield the element as well, then a Steam is the right abstraction.