Mpsc cross-thread issue to read values

Hello,
I'm very new at Rust, and I am exploring threads at the moment :slight_smile:

I'm trying to get the main start a thread: the main thread accepts the user's inputs and the second thread (task::spawn) sends the user's input to the connection (streams).

pub fn main() {
    let (broker_sender, broker_receiver) = mpsc::unbounded::<Event>();
    task::spawn(accept_loop("127.0.0.1:7777", broker_receiver));
    loop {
        let mut user_input = String::new();
        io::stdout().flush().unwrap();
        io::stdin().read_line(&mut user_input).unwrap();
        let user_input_clean = user_input.trim();
        if user_input_clean.starts_with("/send") {
            let to_send: String = user_input_clean.chars().skip(5).collect();
            broker_sender.send(Event::Message { msg: to_send });
        }
    }
}

async fn accept_loop(
    addr: impl ToSocketAddrs,
    broker: Receiver<Event>,
) -> Result<(), std::io::Error> {
    let listener = TcpListener::bind(addr).await?;
    let mut incoming = listener.incoming();
    let broker = Arc::new(broker);

    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        let stream = Arc::new(stream);
        task::spawn(connection_loop(Arc::clone(&stream), broker));
    }
    Ok(())
}

async fn connection_loop(
    stream: Arc<TcpStream>,
    broker: Arc<Receiver<Event>>,
) -> Result<(), std::io::Error> {
    let mut stream = &*stream;
    //let broker_value = Arc::clone(&broker);
    while let Ok(event) = broker.try_next() {
        match event.unwrap() {
            Event::Message { msg } => {
                stream.write_all(msg.as_bytes());
            }
            Event::NewPeer { ip_port, stream } => {}
        }
    }
    Ok(())
}

My current issue is with the mpsc::unbounded. I figure out that I need the broker to communicate between the thread, but I keep getting compilation errors. I think I need to use Arc, but I have had mild success. Currently, the code above error is cannot borrow data in an Arc as mutable. Trying to use the commented line, change the error for cannot borrow data in an Arc as mutable. I also have tried to do like the stream and use &* to access the broker receiver, but it gives cannot borrow *brokeras mutable, as it is behind a& reference.

Let me know if I am using the wrong technic to share data from one thread to another one or to guide me to the right path. Thanks in advance!

Wrapping a Receiver in an Arc very likely indicates a mistake...

You're trying to share the receiver. The hint to the trouble is in the name: mpsc = multiple producer single consumer. That "single consumer" bit is important. That means the Receiver is not sharable no matter how it's packaged.

I suspect you're looking for the opposite: single producer multiple consumer. I think you're trying to build a multicast thing; send the same data to multiple network endpoints. Is that correct?

You are right, I might be using the wrong library to share. My goal is to have a single producer (the user input) and to share it on many threads (the data will be sent to each socket).

What would be your recommendation?

Well ... I thought crossbeam might help but, as far as I can tell, it's strictly for threads.

Recently I was faced with a similar problem. I created one channel for each Task, put the Sender into a Vec, then iterated over the Senders for each message. It wasn't too difficult to get correct. The one caveat is that the messages have to be Clone. For you, that would be the time to bring Arc into the picture. You would wrap to_send in an Arc to make cloning cheap then send that to each Task.

Make sense?

tokio::sync::broadcast is probably what you want. You should be aware of "lagging" in the docs, though. The channel will apply back pressure when necessary to avoid CPU and memory contention. Tuning the channel size is an exercise in knowing your application requirements and system constraints.

1 Like

Thank you @Coding-Badly for the suggestion. Having a vec with the message totally makes sense, as each thread can read the message at a different time.

Let me try to implement your design. I'll follow on that thread if I succeed (or not) :slight_smile:

@parasyte I haven't yet explored tokio. I'll add it to my list of learning materials about Rust. Thank you for the suggestion.

1 Like

I have done a quick experiment that did not work as expected.

I thought I could borrow in "read-only" the vector, trying to use a portion of your idea about iterating the list of streams to send the message.

use std::io;
use std::io::Write;

use async_std::{
    net::{TcpListener, TcpStream, ToSocketAddrs},
    prelude::*,
    sync::{Arc,},
    task,
};

#[derive(Debug)]
enum Event {
    NewPeer {
        ip_port: String,
        stream: Arc<TcpStream>,
    },
    Message {
        msg: String,
    },
}
pub fn main() -> () {
    let streams: Vec<TcpStream> = Vec::new();
    task::spawn(accept_loop("127.0.0.1:7777", streams));
    loop {
        let mut user_input = String::new();
        io::stdout().flush().unwrap();
        io::stdin().read_line(&mut user_input).unwrap();
        let user_input_clean = user_input.trim();
        if user_input_clean.starts_with("/send") {
            let to_send: String = user_input_clean.chars().skip(5).collect();
            send_to_all_streams(Event::Message { msg: to_send }, &streams);
        } 
    }
}

async fn accept_loop(
    addr: impl ToSocketAddrs,
    mut streams: Vec<TcpStream>,
) -> Result<(), std::io::Error> {
    let listener = TcpListener::bind(addr).await?;
    let mut incoming = listener.incoming();

    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        streams.push(stream);
    }
    Ok(())
}

async fn send_to_all_streams(event: Event, streams: &[TcpStream]) -> Result<(), std::io::Error> {
    match event {
        Event::Message { msg } => {
            for mut stream in streams {
                println!("Sending to {} the message {}", stream.peer_addr()?, msg);
                stream.write_all(msg.as_bytes());
            }
        }
        Event::NewPeer { ip_port, stream } => {
            println!("Not yet developed");
        }
    }

    Ok(())
}

Playground Link

What I was trying to do is to have the main loop to get the user input and the accept_loop to get new connections as they arrive. When the user hits enter, take the string and send the message to all existing streams. I understand that the vector can change anytime, so I see that I need to copy the existing vector to loop the existing stream to send the message.

The line send_to_all_streams(Event::Message { msg: to_send }, &streams); does not work with the exception borrow of moved value: streams.

Do you have an idea how I can around? Or maybe my approach is wrong. I know you mentioned to create one channel for each task but I am unsure how it could fit in my current situation.

Crud. Like @parasyte I got it in my head you are using Tokio. I may not be much help with async-std.

That looks blocking. Which means it will block everything else from running.

Yeah. I think io and io::Write should be from async_std...

use std::io;
use std::io::Write;

Yes, the lack of clarity in the OP describing the crates in use was a bit confusing.

Even more confusing is why std::io is being considered at all in an async context. @Coding-Badly's suggestion to stick with async-std types and exports is the way to go.

On the same topic, async-std has a channel type that is mpmc: async_std::channel. (Not mpsc as mentioned in the OP; which I suspect was from std::sync - yikes!) An mpmc channel is conceptually in the same ballpark as Tokio's broadcast.

1 Like

Thanks, @parasyte and @Coding-Badly, for all the effort.

Like I previously said, I am learning Rust, and my small application to give a try with thread was a basic application that allows the user to type a string in the console and send it to many TCP connections. I expect I am not using the right approach. Hence I am trying to reach out to get some guidance.

I am using the std::io from what I could find on the documentation to read input. I thought that if it the input where blocking inside the main thread, that because the communication was in parallel in another thread to get connections and send data would be enough. I may be wrong there. Similar to mpsc which I found in the documentation when learning Rust. I might have miss used also :slight_smile:

If the normal practice is to rely on the Tokio crate, I'll get in that direction.

So, if we step back. I should :slight_smile:

  1. Explore the async_std library to read the user input without blocking the main thread
  2. Explore the Tokio crate to perform the async connection/sending message

What remains unclear is how to share the message from the async_std thread that take the input into the streams. From that thread of discussion, the idea of having a vec of streams and sending to each stream from the async_std thread (input) can work? I am still unclear how the input and connection threads can share the vec of stream. One will always only read (to send the text) and always add (add new connections as they come in).

Again, thanks a lot for all your precious knowledge!

async != thread

While async feels like threading it is not. I suggest picking one or the other until you get some experience with both. (The right choice is async. :grin:)

I believe Tokio also has an async replacement for stdin. If you go with Tokio go all in.

1 Like

I personally can't place any blame on OP for the confusion. async-std and tokio are (for all intents and purposes) incompatible competing async runtimes. Combine that with the fact that std predates async/await by a significant number of years and it's probably quite common to mix up blocking and non-blocking/async until one becomes familiar with the differences.

The async WG has done a good job (IMHO) with learning materials, e.g. :crystal_ball: The vision - wg-async (rust-lang.github.io) but there is still more left to do! :sweat_smile: FWIW, async-std and tokio documentation and blogs are also both excellent resources for learning material!

Thanks @Coding-Badly and @parasyte. I think I'll step back and clarify the async/thread and take a deeper look at Tokio to have a stronger foundation before exploring the small demo I was trying to setup.

I already took a lot of both of your time, I'll close this thread and open other ones with more specific questions as I embrace the journey of learning Rust. Thanks again and see you soon :wave:

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.