Tokio + std::sync::io::mpsc - future cannot be sent between threads safely?

Help... What is this error trying to tell me..?

   Compiling rust_playground_2022-11-07 v0.1.0 (/home/user/Development/rust_playground_2022-11-07)
error: future cannot be sent between threads safely
   --> src/main.rs:15:18
    |
15  |     tokio::spawn(data_sender(rx));
    |                  ^^^^^^^^^^^^^^^ future returned by `data_sender` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<String>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:79:32
    |
77  |     for s in rx.iter() {
    |              -- has type `&std::sync::mpsc::Receiver<String>` which is not `Send`
78  |         println!("{s}");
79  |         sock.send(s.as_bytes()).await;
    |                                ^^^^^^ await occurs here, with `rx` maybe used later
80  |     }
    |     - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:77:14
    |
77  |     for s in rx.iter() {
    |              ^^^^^^^^^
note: required by a bound in `tokio::spawn`
   --> /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `rust_playground_2022-11-07` due to previous error

Code I was trying to compile / run:

use notify::{Config, PollWatcher, RecursiveMode, Watcher};
use tokio::net::{UdpSocket}; // , RecommendedWatcher -> can only be used for normal files, not /proc & /sys
use std::{fs, time::Duration, path::Path, io, sync::mpsc::{self, Receiver, Sender}, net::Ipv4Addr};
//use tokio::{net::UdpSocket};


#[tokio::main]
async fn main() -> io::Result<()> {
    println!("rust main start");

    // https://www.koderhq.com/tutorial/rust/concurrency/
    // create send/receiver vars to move data through channel
    let (tx, rx) = mpsc::channel::<String>();

    tokio::spawn(data_sender(rx));

    tokio::spawn(watch_files(tx));
    
    println!("rust main end");
    Ok(())
}

fn print_name_and_content(path: &Path) -> String {
    let contents = fs::read_to_string(path)
        .expect("Should have been able to read the file");
    format!("{}: {contents}", path.display())
}

async fn watch_files(tx_udp: Sender<String>) {
    let (tx, rx) = std::sync::mpsc::channel();

    let config = Config::default()
    .with_poll_interval(Duration::from_millis(20000))
    .with_compare_contents(true);
    let mut watcher = PollWatcher::new(tx, config).unwrap();

    let mut i: u8 = 0;    
    while i < std::u8::MAX {
      let format = format!("/sys/class/thermal/thermal_zone{i}/temp");
      let path = Path::new(&format);
      if path.exists() {
        tx_udp.send(print_name_and_content(&path)).unwrap();
        watcher
        .watch(path.as_ref(), RecursiveMode::Recursive)
        .unwrap();
        i=i+1;
      }
      else {
        i = std::u8::MAX;
      }
    }


    println!("watch_files setup done, starting loop");
    for res in rx {
        match res {
            Ok(event) => {
                for path in &event.paths {
                    tx_udp.send(print_name_and_content(&path)).unwrap();
                };
                println!("changed: {:?}", event);
            },
            Err(e) => println!("watch error: {:?}", e),
        }
    }
}

async fn data_sender(rx: Receiver<String>) -> io::Result<()> {
    // https://stackoverflow.com/questions/65130849/how-do-i-connect-an-udp-socket-and-let-the-os-choose-the-port-and-address
    // addr: (UNSPECIFIED, 0) means system decides ip and port used
    let sock = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
    println!("bound socket: {:?}", sock);

    let remote_addr = "127.0.0.1:2003";
    sock.connect(remote_addr).await?;

    for s in rx.iter() {
        println!("{s}");
        sock.send(s.as_bytes()).await?;
    }
    Ok(())
}

Maybe I'm just too tired. Gonna go sleep now and see if I find more time to research this error tomorrow.. Good night..

It's easier to use an async channel instead of a synchronous channel within an async function.

use tokio::sync::mpsc::{self, Receiver, Sender};

async fn data_sender(mut rx: Receiver<String>) -> io::Result<()> {
    let sock = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
    println!("bound socket: {:?}", sock);

    let remote_addr = "127.0.0.1:2003";
    sock.connect(remote_addr).await?;

    while let Some(s) = rx.recv().await {
        println!("{s}");
        sock.send(s.as_bytes()).await?;
    }
    Ok(())
}

As for what the error message is trying to tell you, it's saying that everything you send across thread boundaries must implement Send. The future created by your function does not implement Send because it is storing a &Receiver<String> (which is !Send) across an await point.

This is the important part of the error message:

77  |     for s in rx.iter() {
    |              -- has type `&std::sync::mpsc::Receiver<String>` which is not `Send`
1 Like

Thank you for that hint!

Got that sample working now:

use notify::{Config, PollWatcher, RecursiveMode, Watcher};
use tokio::{net::{UdpSocket}, sync::mpsc::{self, Sender, Receiver}}; // , RecommendedWatcher -> can only be used for normal files, not /proc & /sys
use std::{fs, time::Duration, path::Path, io, net::Ipv4Addr};

#[tokio::main]
async fn main() -> io::Result<()> {
    println!("rust main start");

    // https://www.koderhq.com/tutorial/rust/concurrency/
    // create send/receiver vars to move data through channel
    let (tx, rx) = mpsc::channel::<String>(1024);

    tokio::spawn(data_sender(rx));

    watch_files(tx).await;
    
    println!("rust main end");
    Ok(())
}

fn print_name_and_content(path: &Path) -> String {
    let contents = fs::read_to_string(path)
        .expect("Should have been able to read the file");
    let format = format!("{}: {contents}", path.display());
    println!("{format}");
    format
}

async fn watch_files(tx_udp: Sender<String>) {
    let (tx, rx) = std::sync::mpsc::channel();

    let config = Config::default()
    .with_poll_interval(Duration::from_millis(20000))
    .with_compare_contents(true);
    let mut watcher = PollWatcher::new(tx, config).unwrap();

    let mut i: u8 = 0;    
    while i < std::u8::MAX {
      let format = format!("/sys/class/thermal/thermal_zone{i}/temp");
      let path = Path::new(&format);
      if path.exists() {
        tx_udp.send(print_name_and_content(&path)).await.unwrap();
        watcher
        .watch(path.as_ref(), RecursiveMode::Recursive)
        .unwrap();
        i=i+1;
      }
      else {
        i = std::u8::MAX;
      }
    }

    println!("watch_files setup done, starting loop");
    for res in rx {
        match res {
            Ok(event) => {
                for path in &event.paths {
                    tx_udp.send(print_name_and_content(&path)).await.unwrap();
                };
                println!("changed: {:?}", event);
            },
            Err(e) => println!("watch error: {:?}", e),
        }
    }
}

async fn data_sender(mut rx: Receiver<String>) -> io::Result<()> {
    // https://stackoverflow.com/questions/65130849/how-do-i-connect-an-udp-socket-and-let-the-os-choose-the-port-and-address
    // addr: (UNSPECIFIED, 0) means system decides ip and port used
    let sock = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
    println!("bound socket: {:?}", sock);

    let remote_addr = "127.0.0.1:2003";
    sock.connect(remote_addr).await?;

    while let Some(s) = rx.recv().await {
        println!("{s}");
        sock.send(s.as_bytes()).await?;
    }
    Ok(())
}

So far everything worked fine, but now I'd like to use one of those two libs (tried both, give same error, can't see where they differ):

To collect more data. The problem, when I try to send a String through one of my mpsc tx clones in a scope using that library, the compiler complains again:

future cannot be sent between threads safely within impl futures::Future<Output = ()>, the trait std::marker::Send is not implemented for Rc<starship_battery::platform::linux::manager::SysFsManager>

Can anybody suggest how to workaround that issue? I'd prefer if I wouldn't need to create my own threadsafe fork of that battery crate..?

My code: src/main.rs · main · thomas351 / rust-influxdb-udp-logger · GitLab
I've also tried switching from tokio to a futures mpsc channel, which gives me the same error:
src/main.rs · use_futures_mpsc · thomas351 / rust-influxdb-udp-logger · GitLab

The easy way to work around issues like this is to create a new battery::Manager whenever you want to query it, and do not attempt to hold a reference to it over an await point. The general idea in pseudo code is:

loop {
    // Collect state
    let mut state = vec![];
    if let Ok(manager) = Manager::new() {
        let batteries = ...;
        for battery in batteries {
            let voltage = ...;
            state.push(voltage);
        }
    }

    // Send state
    for voltage in state {
        send_sysvar(..., voltage, ...).await;
    }

    // Update every second
    // Note that your code uses `std::thread::sleep()` which is a bug
    tokio::time::sleep(Duration::from_secs(1)).await;
}

The key is splitting the "loop over await points" and "loop over batteries" so that you don't need to hold a reference to a non-Send type over the await point.

Keep in mind that it may not be possible to make Manager thread safe. But it can always be created in a system thread and queried with message passing, a la actor pattern. It doesn't need to be thread safe at all!

1 Like

Thanks for the idea how to workaround the issue that that library is not thread safe!

I'm not sure I understand this last sentence. Is that an alternative idea or a description of the pseudo-code block you gave above?

I'd prefer to only create that battery manager once, and I was hoping there was a way to do that without creating my own fork of this library.

Interpreting the error message of the rust compiler the first step would be to replace the Rc with an Arc in https://github.com/starship/rust-battery/blob/main/src/types/manager.rs - To do that I'd need to create my own fork.. Is there another way?

I found someone already made a fork doing just that: change Rc to Arc · keng9/rust-battery@5a3cc01 · GitHub - Though I have no idea how I could use that since it doesn't seem like it has been published as crate to crates.io.
Maybe this might help? Overriding Dependencies - The Cargo Book

ChatGPT says it should be possible to access that private inner field using code like:

use my_library::{Bar, PrivateField};

struct Wrapper {
    bar: Bar,
}

impl Wrapper {
    pub fn get_private_field(&self) -> PrivateField {
        self.bar.private_field
    }
}

But today's rust compiler say's no.

Okey, so I found I can load the battery crate from git instead of crates.io using this dependency definition:

battery = { git = 'https://github.com/keng9/rust-battery', branch = 'master' }

But that fork doesn't compile:

   Compiling battery v0.7.8 (https://github.com/keng9/rust-battery?branch=master#3629fe9b)
error[E0412]: cannot find type `Arc` in this scope
  --> /home/thomas/.cargo/git/checkouts/rust-battery-a64787e962351703/3629fe9/battery/src/platform/linux/iterator.rs:11:14
   |
11 |     manager: Arc<SysFsManager>,
   |              ^^^ not found in this scope
   |
help: consider importing one of these items
   |
1  | use std::sync::Arc;
   |
1  | use uom::lib::sync::Arc;
   |

error[E0412]: cannot find type `Arc` in this scope
  --> /home/thomas/.cargo/git/checkouts/rust-battery-a64787e962351703/3629fe9/battery/src/platform/linux/iterator.rs:19:21
   |
19 |     fn new(manager: Arc<Self::Manager>) -> Result<Self> {
   |                     ^^^ not found in this scope
   |
help: consider importing one of these items
   |
1  | use std::sync::Arc;
   |
1  | use uom::lib::sync::Arc;
   |

So I guess next step, create my own fork of that crate on github and make it compile :wink:

Nice, that seems to work!

I've created my battery crate fork: GitHub - thomas725/rust-battery: Rust crate providing cross-platform information about the notebook batteries. (threadsafe)

and imported it in my rust-influxdb-udp-logger Cargo.toml like that:

starship-battery = { git = 'https://github.com/thomas725/rust-battery', branch = 'main' }

which now allows me to hold onto that battery manager reference while sending data through the mpsc channel.

Though it seems I misunderstood how to replace a tokio mpsc channel with the one from the futures crate, since the data_sender thread from the branch where I use the futures mpsc channel never processes any lines: src/main.rs · use_futures_mpsc · thomas351 / rust-influxdb-udp-logger · GitLab

So I guess I'll stick with the tokio version, couldn't find if and which advantages a mpsc channel from the futures crates could give me, just tried it to see if that could workaround the issue discussed.

For clarity, yes this was an alternative workaround. In that design you would model it after the actor pattern where a system thread holds a resource and provides access to it through message passing. The only thing that needs to be Send In this scenario is the message type.

Anyway I’m glad you found something that works for you.

1 Like

I'd love to understand why in my solution it is necessary to send the battery manager instance, just because I want to remember and reuse it after I've sent a string to the mpsc receiver running in another thread. In my mind the only thing that needs to be transferred to the other thread is that string, and not everything in the senders scope..

Sending a string is not the problem. This is the error message I get when I compile your crate at 87d7c10 on macOS:

error: future cannot be sent between threads safely
   --> src/main.rs:162:35
    |
162 |         handles.push(tokio::spawn(watch_batteries(args.hostname, tx, args.minimum_frequency)));
    |                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `watch_batteries` is not `Send`
    |
    = help: within `impl futures::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `Rc<battery::platform::darwin::manager::IoKitManager>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:285:170
    |
269 |     if let Ok(manager) = Manager::new() {
    |               ------- has type `Manager` which is not `Send`
...
285 |                         send_sysvar(&mut last_sent, "battery_voltage", voltage as f64, current_time, minimum_frequency, format!(" {hostname}.battery_voltage="), &tx_udp).awa...
    |                                                                                                                                                                          ^^^^^^ await occurs here, with `manager` maybe used later
...
292 |     }
    |     - `manager` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /Users/parasyte/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.23.0/src/task/spawn.rs:163:21
    |
163 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

This error points to Manager as the type which must implement Send. There's another error that points at good_bats because the platform-specific Battery type for macOS is also not Send (it holds a raw pointer). Not surprisingly, this is why I focused on Manager in my suggestions above.

There isn't any mention in the error messages of "string". Maybe it's different when building on other platforms, but I cannot see how you're getting stuck on strings being problematic.

1 Like

Sorry, that was a misunderstanding. I get the same error on my Linux machine. It's just all that I'm trying to send to that other thread is a string generated by the format! macro. See line 370 - I don't understand why the compiler says that the battery manager instance needs to implement send, even though I'm only sending a String.

The Future needs to implement Send because you are spawning the task onto a threaded runtime. This means that there is a possibility that the Future will be polled in a different thread each time. The Send bound is how this requirement is encoded into the API: spawn in tokio - Rust

pub fn spawn<T>(future: T) -> JoinHandle<T::Output> ⓘ
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,

cf.

162 |         handles.push(tokio::spawn(watch_batteries(args.hostname, tx, args.minimum_frequency)));
    |                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `watch_batteries` is not `Send`

This error has nothing at all to do with what you are awaiting, and everything to do with what the Future itself is composed of. Because you hold a !Send type (Manager) across an await point, the Future itself is !Send, thus the error.

1 Like

Ahh, I think now I get it. Is there a way to spawn an async task in a way, that it can't be continued by a different thread after an await point, but just blocks the thread until the await is done? You mentioned something about a system thread before, is that what you where talking about?

You have the following options:

  1. Never keep it across an .await. For example, you might ensure that the battery type is created, used, and destroyed entirely between two calls to .await. This is most easily done by defining a non-async method and using it in there.
  2. Use a completely single-threaded runtime where nothing is moved across threads because there is only one. (This requires a LocalSet in Tokio.)
  3. Use std::thread::spawn to create a dedicated thread that is used only for managing this battery type, and communicate with it using tokio::sync::mpsc.
  4. Use a fork that uses Arc.

I would like to add an additional thing: Be very very careful with using std::sync::mpsc in async code. That channel very quickly results in you blocking the thread, which is pretty bad. See this article for more info that.

4 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.