I want to create a simple and clear queue

My Queue:


use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
    where T: Send, {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}

impl<T: Send> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}

// help to spawn sub thread
macro_rules! go {
    ($($body:tt)*) => {{
        thread::spawn(move || {
            $($body)*
        });
    }}
}

I have a test case that uses String,It worked as expected:

fn test_String_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = Arc::new(SafeQueue::<String>::new());

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    go! {
        for i in 0..100{
            queue_clone.push("Send from sender1:".to_owned()+&i.to_string());
        }
    }

    let queue_clone = queue.clone();
    go! {
        for i in 0..100{
            queue_clone.push("Send from sender2:".to_owned()+&i.to_string());
        }
    }

    let mut num = 0;

    let queue_clone = queue.clone();
    go! {
        loop{
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    }
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

But when I try to pass a closure, it fails to compile. How do I adjust it

fn test_FnOnce_queue() {
    let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());

    let queue_clone = queue.clone();
    go! {
    for i in 0..100{
        queue_clone.push(Box::pin(||println!("Send from sender1:{}",&i.to_string())));
    }
}

    let queue_clone = queue.clone();
    go! {
    for i in 0..100{
        queue_clone.push(Box::pin(||println!("Send from sender2:{}",&i.to_string())));
    }
}

    let mut num = 0;

    let queue_clone = queue.clone();
    go! {
    loop{
        (queue_clone.pop().unwrap())();
        thread::sleep(time::Duration::from_millis(1));
    }
}
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::pin(|| println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        queue.pop();
    }
}

ERROR:
error[E0599]: the function or associated item new exists for struct SafeQueue<dyn FnOnce() + Send + Sync>, but its trait bounds were not satisfied
--> src\main.rs:412:67
|
8 | struct SafeQueue
| ------------------- function or associated item new not found for this struct
...
412 | let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());
| ^^^ function or associated item cannot be called on SafeQueue<dyn FnOnce() + Send + Sync> due to unsatisfied trait bounds
|
::: C:\Users\Eurax.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\ops\function.rs:240:1
|
240 | pub trait FnOnce {
| ---------------------- doesn't satisfy dyn FnOnce() + Send + Sync: Sized
|
= note: the following trait bounds were not satisfied:
dyn FnOnce() + Send + Sync: Sized

You're boxing the closures, you need to reflect that in the type signature

SafeQueue::<Pin<Box<dyn FnOnce() + Send + Sync>>>::new())

I'm not sure why you're using Pin here though, you probably don't need it

1 Like

thx,i fix

use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
    where T: Send, {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}

impl<T: Send> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}


fn test_String_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = Arc::new(SafeQueue::<String>::new());

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
        }
    });

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

fn test_FnOnce_queue() {
    let queue = Arc::new(SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new());

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
        }
    });


    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            if !queue_clone.empty() {
                (queue_clone.pop().unwrap())();
            }
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        if !queue.empty() {
            (queue.pop().unwrap())();
        }
    }
}

fn main() {
    test_String_queue();
    test_FnOnce_queue();
}

The API presented is a stack (push and pop) not a queue (FIFO or double-ended). For a queue, std::collections::VecDeque is a good building block.

The double-Arc is not recommended. You can instead implement Clone:

impl<T: Send> Clone for SafeQueue<T> {
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
        }
    }
}

Now you can just clone the queue all you want, and it defers to the internal Arc::clone.

i will try

i fix:


use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
struct SafeQueue<T> {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T: Send> Clone for SafeQueue<T> {
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
        }
    }
}

impl<T> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}

/// test case for String
fn test_string_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = SafeQueue::<String>::new();

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
        }
    });

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

/// test case for dyn FnOnce
fn test_fn_once_queue() {
    let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
        }
    });


    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            if !queue_clone.empty() {
                print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
            }
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        if !queue.empty() {
            print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
        }
    }
}

fn main() {
    test_string_queue();
    test_fn_once_queue();
}

thx

1 Like

Since you removed the T: Send bound in the rest of the code, you won't need it for the Clone impl, either. :+1:

And it's easy to derive the trait implementation, which Clippy will probably tell you about. The whole block can be replaced with:

#[derive(Clone)]
struct SafeQueue<T> {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}
1 Like

i fix:

use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A thread safe and easy to share queue
struct SafeQueue<T> {
    //In this way, our Queue is a Send, and Sync’s
    queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T> Clone for SafeQueue<T> {
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
        }
    }
}

impl<T> SafeQueue<T> {
    // Create a safe queue
    // The VEC of the queue implements send, Sync Trait
    // and wrapped by Mutex
    fn new() -> SafeQueue<T> {
        SafeQueue {
            queue: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn empty(&self) -> bool {
        let queue = self.queue.lock().unwrap();
        queue.is_empty()
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop()
    }
}

/// test case for String
fn test_string_queue() {
    // Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
    let queue = SafeQueue::<String>::new();

    // Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
        }
    });

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            println!("Get From Thread {:?}", queue_clone.pop());
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            // Sending data to the queue in the main thread
            queue.push("Send from main:".to_string() + &num.to_string());
            num += 1;
        }
        if queue.empty() {
            break;
        }
        println!("Get From main {:?}", queue.pop());
    }
}

/// test case for dyn FnOnce
fn test_fn_once_queue() {
    let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();

    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
        }
    });


    let queue_clone = queue.clone();
    thread::spawn(move || {
        for i in 0..100 {
            queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
        }
    });

    let mut num = 0;

    let queue_clone = queue.clone();
    thread::spawn(move || {
        loop {
            if !queue_clone.empty() {
                print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
            }
            thread::sleep(time::Duration::from_millis(1));
        }
    });
    loop {
        thread::sleep(time::Duration::from_millis(1));
        if num < 100 {
            queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
            num += 1;
        }
        if queue.empty() {
            break;
        }
        if !queue.empty() {
            print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
        }
    }
}

fn main() {
    test_string_queue();
    test_fn_once_queue();
}

i try that,but i use Box<dyn FnOnce()+Send+Sync>,that will compile error

The current result is very in line with my expectations

Oh, that is interesting! Yes, the manual implementation is a little less concerned with what T is (and rightly so). Derive macros unfortunately cannot peek inside and see that it's just an Arc.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.