I want to create a simple and clear queue

My Queue:


use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
    where T: Send, {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}

impl<T: Send> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}

// help to spawn sub thread
macro_rules! go {
    ($($body:tt)*) => {{
        thread::spawn(move || {
            $($body)*
        });
    }}
}

I have a test case that uses String,It worked as expected:

fn test_String_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = Arc::new(SafeQueue::<String>::new());

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    go! {
        for i in 0..100{
            queue_clone.push("Send from sender1:".to_owned()+&i.to_string());
        }
    }

    let queue_clone = queue.clone();
    go! {
        for i in 0..100{
            queue_clone.push("Send from sender2:".to_owned()+&i.to_string());
        }
    }

    let mut num = 0;

    let queue_clone = queue.clone();
    go! {
        loop{
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    }
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

But when I try to pass a closure, it fails to compile. How do I adjust it

fn test_FnOnce_queue() {
    let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());

    let queue_clone = queue.clone();
    go! {
    for i in 0..100{
        queue_clone.push(Box::pin(||println!("Send from sender1:{}",&i.to_string())));
    }
}

    let queue_clone = queue.clone();
    go! {
    for i in 0..100{
        queue_clone.push(Box::pin(||println!("Send from sender2:{}",&i.to_string())));
    }
}

    let mut num = 0;

    let queue_clone = queue.clone();
    go! {
    loop{
        (queue_clone.pop().unwrap())();
        thread::sleep(time::Duration::from_millis(1));
    }
}
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::pin(|| println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        queue.pop();
    }
}

ERROR:
error[E0599]: the function or associated item new exists for struct SafeQueue<dyn FnOnce() + Send + Sync>, but its trait bounds were not satisfied
--> src\main.rs:412:67
|
8 | struct SafeQueue
| ------------------- function or associated item new not found for this struct
...
412 | let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());
| ^^^ function or associated item cannot be called on SafeQueue<dyn FnOnce() + Send + Sync> due to unsatisfied trait bounds
|
::: C:\Users\Eurax.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\ops\function.rs:240:1
|
240 | pub trait FnOnce {
| ---------------------- doesn't satisfy dyn FnOnce() + Send + Sync: Sized
|
= note: the following trait bounds were not satisfied:
dyn FnOnce() + Send + Sync: Sized

You're boxing the closures, you need to reflect that in the type signature

SafeQueue::<Pin<Box<dyn FnOnce() + Send + Sync>>>::new())

I'm not sure why you're using Pin here though, you probably don't need it

1 Like

thx,i fix

use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
    where T: Send, {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}

impl<T: Send> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}


fn test_String_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = Arc::new(SafeQueue::<String>::new());

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
        }
    });

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

fn test_FnOnce_queue() {
    let queue = Arc::new(SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new());

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
        }
    });


    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            if !queue_clone.empty() {
                (queue_clone.pop().unwrap())();
            }
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        if !queue.empty() {
            (queue.pop().unwrap())();
        }
    }
}

fn main() {
    test_String_queue();
    test_FnOnce_queue();
}

The API presented is a stack (push and pop) not a queue (FIFO or double-ended). For a queue, std::collections::VecDeque is a good building block.

The double-Arc is not recommended. You can instead implement Clone:

impl<T: Send> Clone for SafeQueue<T> {
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
        }
    }
}

Now you can just clone the queue all you want, and it defers to the internal Arc::clone.

i will try

i fix:


use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
struct SafeQueue<T> {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T: Send> Clone for SafeQueue<T> {
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
        }
    }
}

impl<T> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}

/// test case for String
fn test_string_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = SafeQueue::<String>::new();

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
        }
    });

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

/// test case for dyn FnOnce
fn test_fn_once_queue() {
    let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
        }
    });


    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            if !queue_clone.empty() {
                print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
            }
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        if !queue.empty() {
            print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
        }
    }
}

fn main() {
    test_string_queue();
    test_fn_once_queue();
}

thx

1 Like

Since you removed the T: Send bound in the rest of the code, you won't need it for the Clone impl, either. :+1:

And it's easy to derive the trait implementation, which Clippy will probably tell you about. The whole block can be replaced with:

#[derive(Clone)]
struct SafeQueue<T> {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}
1 Like

i fix:

use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
struct SafeQueue<T> {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T> Clone for SafeQueue<T> {
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
        }
    }
}

impl<T> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}

/// test case for String
fn test_string_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = SafeQueue::<String>::new();

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
        }
    });

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

/// test case for dyn FnOnce
fn test_fn_once_queue() {
    let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
        }
    });


    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            if !queue_clone.empty() {
                print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
            }
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        if !queue.empty() {
            print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
        }
    }
}

fn main() {
    test_string_queue();
    test_fn_once_queue();
}

i try that,but i use Box<dyn FnOnce()+Send+Sync>,that will compile error

The current result is very in line with my expectations

Oh, that is interesting! Yes, the manual implementation is a little less concerned with what T is (and rightly so). Derive macros unfortunately cannot peek inside and see that it's just an Arc.