use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
where T: Send, {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
impl<T: Send> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
// help to spawn sub thread
macro_rules! go {
($($body:tt)*) => {{
thread::spawn(move || {
$($body)*
});
}}
}
I have a test case that uses String,It worked as expected:
fn test_String_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = Arc::new(SafeQueue::<String>::new());
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push("Send from sender1:".to_owned()+&i.to_string());
}
}
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push("Send from sender2:".to_owned()+&i.to_string());
}
}
let mut num = 0;
let queue_clone = queue.clone();
go! {
loop{
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
}
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
But when I try to pass a closure, it fails to compile. How do I adjust it
fn test_FnOnce_queue() {
let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push(Box::pin(||println!("Send from sender1:{}",&i.to_string())));
}
}
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push(Box::pin(||println!("Send from sender2:{}",&i.to_string())));
}
}
let mut num = 0;
let queue_clone = queue.clone();
go! {
loop{
(queue_clone.pop().unwrap())();
thread::sleep(time::Duration::from_millis(1));
}
}
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::pin(|| println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
queue.pop();
}
}
ERROR:
error[E0599]: the function or associated item new exists for struct SafeQueue<dyn FnOnce() + Send + Sync>, but its trait bounds were not satisfied
--> src\main.rs:412:67
|
8 | struct SafeQueue
| ------------------- function or associated item new not found for this struct
...
412 | let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());
| ^^^ function or associated item cannot be called on SafeQueue<dyn FnOnce() + Send + Sync> due to unsatisfied trait bounds
|
::: C:\Users\Eurax.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\ops\function.rs:240:1
|
240 | pub trait FnOnce {
| ---------------------- doesn't satisfy dyn FnOnce() + Send + Sync: Sized
|
= note: the following trait bounds were not satisfied: dyn FnOnce() + Send + Sync: Sized
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
where T: Send, {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
impl<T: Send> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
fn test_String_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = Arc::new(SafeQueue::<String>::new());
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
fn test_FnOnce_queue() {
let queue = Arc::new(SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new());
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
if !queue_clone.empty() {
(queue_clone.pop().unwrap())();
}
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
if !queue.empty() {
(queue.pop().unwrap())();
}
}
}
fn main() {
test_String_queue();
test_FnOnce_queue();
}
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
struct SafeQueue<T> {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T: Send> Clone for SafeQueue<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
}
}
}
impl<T> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
/// test case for String
fn test_string_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = SafeQueue::<String>::new();
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
/// test case for dyn FnOnce
fn test_fn_once_queue() {
let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
if !queue_clone.empty() {
print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
}
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
if !queue.empty() {
print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
}
}
}
fn main() {
test_string_queue();
test_fn_once_queue();
}
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
struct SafeQueue<T> {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T> Clone for SafeQueue<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
}
}
}
impl<T> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
/// test case for String
fn test_string_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = SafeQueue::<String>::new();
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
/// test case for dyn FnOnce
fn test_fn_once_queue() {
let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
if !queue_clone.empty() {
print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
}
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
if !queue.empty() {
print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
}
}
}
fn main() {
test_string_queue();
test_fn_once_queue();
}
Oh, that is interesting! Yes, the manual implementation is a little less concerned with what T is (and rightly so). Derive macros unfortunately cannot peek inside and see that it's just an Arc.