Nested threads and `dyn std::ops::Fn()` cannot be sent between threads safely

I'm trying to pass a callback to a thread inside another thread. the callback eventually calls channels sender.send. however what i get is dyn std::ops::Fn() cannot be sent between threads safely. if i replace the callback with the sender variable it throws the same error but this time for std::sync::mpsc::sender
What is the proper way of achieving this?

	use std::thread;
use std::sync::mpsc::{channel, Sender};
use std::time::Duration;

type CallbackType = dyn Fn(&str, Sender<&'static str>);

fn relay(callback: Box<dyn Fn()>) {
	thread::spawn(move || {
		callback();
	});
}

fn mycallback(text: Box<&'static str>, sender: Sender<&'static str>) {
	sender.send(text.as_ref());
}

fn main() {
	let (sender, receiver) = channel();
	let sender2 = sender.clone();
	thread::spawn(move || {
		sender2.send("level 1");
		relay(Box::new(move || mycallback(Box::new("internal hello"), sender2)));
	});
	thread::spawn(move || {
		loop {
			println!("{}", receiver.recv().unwrap()); // Received immediately
		}
	});
	loop {
		thread::sleep(Duration::from_secs(2)); // block for two seconds
	}
}
1 Like

By using the special dyn Trait type called a "trait object", you are forgetting everything about the underlying type, except what you explicitly mentioned in the trait object directly. Thus if you don't specify that it can be sent across threads, then the compiler forgets whether it is safe to do so.

// you want to specify that it can be sent across threads:
dyn Fn(...) + Send
2 Likes

Thank you, i added Send as you suggested and i no longer get that error but it got another one cannot move out of sender2, a captured variable in an Fn closure src\main.rs:22:71
I used box get rid of that error but it caused another one "constructor is not visible here due to private fields" it complaints about the same line but this part : Box.new(sender2))));
I'm guessing that means i can't use box for sender, FnBox is also not an option since it is only available in nightly. I'm out of ideas right now :confused:

	use std::thread;
use std::sync::mpsc::{channel, Sender};
use std::time::Duration;

type CallbackType = dyn Fn(&str, Sender<&'static str>);

fn relay(callback: Box<dyn Fn() + Send + 'static>) {
	thread::spawn(move || {
		callback();
	});
}

fn mycallback(text: Box<&'static str>, sender: Box<Sender<&'static str>>) {
	sender.as_ref().send(text.as_ref());
}

fn main() {
	let (sender, receiver) = channel();
	let sender2 = sender.clone();
	thread::spawn(move || {
		sender2.send("level 1");
		relay(Box::new(move || mycallback(Box::new("internal hello"), Box.new(sender2))));
	});
	thread::spawn(move || {
		loop {
			println!("{}", receiver.recv().unwrap()); // Received immediately
		}
	});
	loop {
		thread::sleep(Duration::from_secs(2)); // block for two seconds
	}
}

Uhh, why are you putting the channels in boxes?

If i don't i'll get the following error

Do you really need Fn? That means, do you really need to call the function several times?

In the program i'm writing threads listen on different events so they'll run forever and will call the callback many times.
The program is opensource actually https://github.com/sinasalek/clipboard_guardian

The "constructor is not visible" is likely due to the fact that you are using Box.new() rather than Box::new(). However, I don't think using Box fixes the underlying issue there.

I think the problem is that sender2 is moved into the closure passed to relay(), and then moved into the mycallback() function. That moves it out of the closure passed to relay(), meaning it is invalid if you want to call the closure again (which Fn allows, hence Cerberuser's question).

I suspect what you want to do is clone sender2 inside the closure passed to relay() to create a sender3, which you can then pass to mycallback. That would mean the sender is cloned, rather than moved, every time the closure is called.

An alternative (and perhaps better) option would be to make the sender argument of mycallback() into a &Sender, so it won't take ownership. That should work, since nothing I can see requires that it take ownership.

1 Like

Thanks a lot man, that fixed the problem. I tried to incorporate the changes into the my program but i hit another issue with thread-safety after i tried to use the callback with periodic::Planner

dyn std::ops::Fn() + std::marker::Send cannot be shared between threads safely
--> src\main.rs:9:13

I tried adding Sync to CallbackType but when i do that i get the following error, it complains about &sender2:

std::sync::mpsc::Sender<&str> cannot be shared between threads safely

use std::thread;
use std::sync::mpsc::{channel, Sender};
use std::time::Duration;

type CallbackType = Box<dyn Fn() + Send + 'static>;

fn relay(callback: CallbackType) {
	let mut planner = periodic::Planner::new();
	planner.add(
		|| callback(),
		periodic::After::new(Duration::from_secs(5)),
	);
	planner.start();
	drop(planner)
}

fn mycallback(text: Box<&'static str>, sender: &Sender<&'static str>) {
	sender.send(text.as_ref());
}

fn main() {
	let (sender, receiver) = channel();
	let sender2 = sender.clone();
	thread::spawn(move || {
		sender2.send("level 1");
		relay(Box::new(move || mycallback(Box::new("internal hello"), &sender2)));
	});
	thread::spawn(move || {
		loop {
			println!("{}", receiver.recv().unwrap()); // Received immediately
		}
	});
	loop {
		thread::sleep(Duration::from_secs(2)); // block for two seconds
	}
}

Use crossbeam's channel. It is Sync.

2 Likes

Thank you guys
here is the working version

use std::thread;
use std::time::Duration;
use crossbeam::channel::{unbounded, Sender};

type CallbackType = Box<dyn Fn() + Send + Sync + 'static>;

fn relay(callback: CallbackType) {
	let mut planner = periodic::Planner::new();
	planner.add(
		move || callback(),
		periodic::After::new(Duration::from_secs(5)),
	);
	planner.start();
	drop(planner)
}

fn mycallback(text: Box<&'static str>, sender: &Sender<&'static str>) {
	sender.send(text.as_ref());
}

fn main() {
	let (sender, receiver) = unbounded();
	let sender2 = sender.clone();
	thread::spawn(move || {
		sender2.send("level 1");
		relay(Box::new(move || mycallback(Box::new("internal hello"), &sender2)));
	});
	thread::spawn(move || {
		loop {
			println!("{}", receiver.recv().unwrap()); // Received immediately
		}
	});
	loop {
		thread::sleep(Duration::from_secs(2)); // block for two seconds
	}
}

You should not put the &'static str type in a box. That's kinda pointless.

1 Like

Nice catch @alice i removed it and it still works.
Is that because rust automatically makes it static when there dyn (trait objedts) so using 'static here was redundant?

A &'static str is not a trait object. It is simply a reference to a string slice stored in an immutable global.

2 Likes

@alice it seems mpsc also support sync channel

SyncSender in std::sync::mpsc - Rust

In this case the difference is whether the channel is bounded or not, and has nothing to do with the Sync trait. That SyncSender implements Sync and Sender doesn't is a coincidence, and if you want an unbounded channel with a sender that implements Sync, you'll need crossbeam.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.