Tokio + std::sync::io::mpsc - future cannot be sent between threads safely?

Help... What is this error trying to tell me..?

   Compiling rust_playground_2022-11-07 v0.1.0 (/home/user/Development/rust_playground_2022-11-07)
error: future cannot be sent between threads safely
   --> src/main.rs:15:18
    |
15  |     tokio::spawn(data_sender(rx));
    |                  ^^^^^^^^^^^^^^^ future returned by `data_sender` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<String>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:79:32
    |
77  |     for s in rx.iter() {
    |              -- has type `&std::sync::mpsc::Receiver<String>` which is not `Send`
78  |         println!("{s}");
79  |         sock.send(s.as_bytes()).await;
    |                                ^^^^^^ await occurs here, with `rx` maybe used later
80  |     }
    |     - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:77:14
    |
77  |     for s in rx.iter() {
    |              ^^^^^^^^^
note: required by a bound in `tokio::spawn`
   --> /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `rust_playground_2022-11-07` due to previous error

Code I was trying to compile / run:

use notify::{Config, PollWatcher, RecursiveMode, Watcher};
use tokio::net::{UdpSocket}; // , RecommendedWatcher -> can only be used for normal files, not /proc & /sys
use std::{fs, time::Duration, path::Path, io, sync::mpsc::{self, Receiver, Sender}, net::Ipv4Addr};
//use tokio::{net::UdpSocket};


#[tokio::main]
async fn main() -> io::Result<()> {
    println!("rust main start");

    // https://www.koderhq.com/tutorial/rust/concurrency/
    // create send/receiver vars to move data through channel
    let (tx, rx) = mpsc::channel::<String>();

    tokio::spawn(data_sender(rx));

    tokio::spawn(watch_files(tx));
    
    println!("rust main end");
    Ok(())
}

fn print_name_and_content(path: &Path) -> String {
    let contents = fs::read_to_string(path)
        .expect("Should have been able to read the file");
    format!("{}: {contents}", path.display())
}

async fn watch_files(tx_udp: Sender<String>) {
    let (tx, rx) = std::sync::mpsc::channel();

    let config = Config::default()
    .with_poll_interval(Duration::from_millis(20000))
    .with_compare_contents(true);
    let mut watcher = PollWatcher::new(tx, config).unwrap();

    let mut i: u8 = 0;    
    while i < std::u8::MAX {
      let format = format!("/sys/class/thermal/thermal_zone{i}/temp");
      let path = Path::new(&format);
      if path.exists() {
        tx_udp.send(print_name_and_content(&path)).unwrap();
        watcher
        .watch(path.as_ref(), RecursiveMode::Recursive)
        .unwrap();
        i=i+1;
      }
      else {
        i = std::u8::MAX;
      }
    }


    println!("watch_files setup done, starting loop");
    for res in rx {
        match res {
            Ok(event) => {
                for path in &event.paths {
                    tx_udp.send(print_name_and_content(&path)).unwrap();
                };
                println!("changed: {:?}", event);
            },
            Err(e) => println!("watch error: {:?}", e),
        }
    }
}

async fn data_sender(rx: Receiver<String>) -> io::Result<()> {
    // https://stackoverflow.com/questions/65130849/how-do-i-connect-an-udp-socket-and-let-the-os-choose-the-port-and-address
    // addr: (UNSPECIFIED, 0) means system decides ip and port used
    let sock = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
    println!("bound socket: {:?}", sock);

    let remote_addr = "127.0.0.1:2003";
    sock.connect(remote_addr).await?;

    for s in rx.iter() {
        println!("{s}");
        sock.send(s.as_bytes()).await?;
    }
    Ok(())
}

Maybe I'm just too tired. Gonna go sleep now and see if I find more time to research this error tomorrow.. Good night..

It's easier to use an async channel instead of a synchronous channel within an async function.

use tokio::sync::mpsc::{self, Receiver, Sender};

async fn data_sender(mut rx: Receiver<String>) -> io::Result<()> {
    let sock = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
    println!("bound socket: {:?}", sock);

    let remote_addr = "127.0.0.1:2003";
    sock.connect(remote_addr).await?;

    while let Some(s) = rx.recv().await {
        println!("{s}");
        sock.send(s.as_bytes()).await?;
    }
    Ok(())
}

As for what the error message is trying to tell you, it's saying that everything you send across thread boundaries must implement Send. The future created by your function does not implement Send because it is storing a &Receiver<String> (which is !Send) across an await point.

This is the important part of the error message:

77  |     for s in rx.iter() {
    |              -- has type `&std::sync::mpsc::Receiver<String>` which is not `Send`
1 Like

Thank you for that hint!

Got that sample working now:

use notify::{Config, PollWatcher, RecursiveMode, Watcher};
use tokio::{net::{UdpSocket}, sync::mpsc::{self, Sender, Receiver}}; // , RecommendedWatcher -> can only be used for normal files, not /proc & /sys
use std::{fs, time::Duration, path::Path, io, net::Ipv4Addr};

#[tokio::main]
async fn main() -> io::Result<()> {
    println!("rust main start");

    // https://www.koderhq.com/tutorial/rust/concurrency/
    // create send/receiver vars to move data through channel
    let (tx, rx) = mpsc::channel::<String>(1024);

    tokio::spawn(data_sender(rx));

    watch_files(tx).await;
    
    println!("rust main end");
    Ok(())
}

fn print_name_and_content(path: &Path) -> String {
    let contents = fs::read_to_string(path)
        .expect("Should have been able to read the file");
    let format = format!("{}: {contents}", path.display());
    println!("{format}");
    format
}

async fn watch_files(tx_udp: Sender<String>) {
    let (tx, rx) = std::sync::mpsc::channel();

    let config = Config::default()
    .with_poll_interval(Duration::from_millis(20000))
    .with_compare_contents(true);
    let mut watcher = PollWatcher::new(tx, config).unwrap();

    let mut i: u8 = 0;    
    while i < std::u8::MAX {
      let format = format!("/sys/class/thermal/thermal_zone{i}/temp");
      let path = Path::new(&format);
      if path.exists() {
        tx_udp.send(print_name_and_content(&path)).await.unwrap();
        watcher
        .watch(path.as_ref(), RecursiveMode::Recursive)
        .unwrap();
        i=i+1;
      }
      else {
        i = std::u8::MAX;
      }
    }

    println!("watch_files setup done, starting loop");
    for res in rx {
        match res {
            Ok(event) => {
                for path in &event.paths {
                    tx_udp.send(print_name_and_content(&path)).await.unwrap();
                };
                println!("changed: {:?}", event);
            },
            Err(e) => println!("watch error: {:?}", e),
        }
    }
}

async fn data_sender(mut rx: Receiver<String>) -> io::Result<()> {
    // https://stackoverflow.com/questions/65130849/how-do-i-connect-an-udp-socket-and-let-the-os-choose-the-port-and-address
    // addr: (UNSPECIFIED, 0) means system decides ip and port used
    let sock = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
    println!("bound socket: {:?}", sock);

    let remote_addr = "127.0.0.1:2003";
    sock.connect(remote_addr).await?;

    while let Some(s) = rx.recv().await {
        println!("{s}");
        sock.send(s.as_bytes()).await?;
    }
    Ok(())
}