My Queue:
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
where T: Send, {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
impl<T: Send> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
// help to spawn sub thread
macro_rules! go {
($($body:tt)*) => {{
thread::spawn(move || {
$($body)*
});
}}
}
I have a test case that uses String,It worked as expected:
fn test_String_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = Arc::new(SafeQueue::<String>::new());
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push("Send from sender1:".to_owned()+&i.to_string());
}
}
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push("Send from sender2:".to_owned()+&i.to_string());
}
}
let mut num = 0;
let queue_clone = queue.clone();
go! {
loop{
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
}
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
But when I try to pass a closure, it fails to compile. How do I adjust it
fn test_FnOnce_queue() {
let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push(Box::pin(||println!("Send from sender1:{}",&i.to_string())));
}
}
let queue_clone = queue.clone();
go! {
for i in 0..100{
queue_clone.push(Box::pin(||println!("Send from sender2:{}",&i.to_string())));
}
}
let mut num = 0;
let queue_clone = queue.clone();
go! {
loop{
(queue_clone.pop().unwrap())();
thread::sleep(time::Duration::from_millis(1));
}
}
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::pin(|| println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
queue.pop();
}
}
ERROR:
error[E0599]: the function or associated item new
exists for struct SafeQueue<dyn FnOnce() + Send + Sync>
, but its trait bounds were not satisfied
--> src\main.rs:412:67
|
8 | struct SafeQueue
| ------------------- function or associated item new
not found for this struct
...
412 | let queue = Arc::new(SafeQueue::<dyn FnOnce() + Send + Sync>::new());
| ^^^ function or associated item cannot be called on SafeQueue<dyn FnOnce() + Send + Sync>
due to unsatisfied trait bounds
|
::: C:\Users\Eurax.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\ops\function.rs:240:1
|
240 | pub trait FnOnce {
| ---------------------- doesn't satisfy dyn FnOnce() + Send + Sync: Sized
|
= note: the following trait bounds were not satisfied:
dyn FnOnce() + Send + Sync: Sized
You're boxing the closures, you need to reflect that in the type signature
SafeQueue::<Pin<Box<dyn FnOnce() + Send + Sync>>>::new())
I'm not sure why you're using Pin
here though, you probably don't need it
1 Like
thx,i fix
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
#[derive(Clone)]
struct SafeQueue<T>
where T: Send, {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
impl<T: Send> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
fn test_String_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = Arc::new(SafeQueue::<String>::new());
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
fn test_FnOnce_queue() {
let queue = Arc::new(SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new());
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
if !queue_clone.empty() {
(queue_clone.pop().unwrap())();
}
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
if !queue.empty() {
(queue.pop().unwrap())();
}
}
}
fn main() {
test_String_queue();
test_FnOnce_queue();
}
The API presented is a stack (push and pop) not a queue (FIFO or double-ended). For a queue, std::collections::VecDeque
is a good building block.
The double-Arc is not recommended. You can instead implement Clone
:
impl<T: Send> Clone for SafeQueue<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
}
}
}
Now you can just clone the queue all you want, and it defers to the internal Arc::clone
.
i fix:
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
struct SafeQueue<T> {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T: Send> Clone for SafeQueue<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
}
}
}
impl<T> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
/// test case for String
fn test_string_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = SafeQueue::<String>::new();
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
/// test case for dyn FnOnce
fn test_fn_once_queue() {
let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
if !queue_clone.empty() {
print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
}
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
if !queue.empty() {
print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
}
}
}
fn main() {
test_string_queue();
test_fn_once_queue();
}
thx
1 Like
Since you removed the T: Send
bound in the rest of the code, you won't need it for the Clone
impl, either. 
And it's easy to derive the trait implementation, which Clippy will probably tell you about. The whole block can be replaced with:
#[derive(Clone)]
struct SafeQueue<T> {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
1 Like
i fix:
use std::{thread, time};
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// A thread safe and easy to share queue
struct SafeQueue<T> {
//In this way, our Queue is a Send, and Sync’s
queue: Arc<Mutex<Vec<T>>>,
}
// use Send Clone for fix double Arc
impl<T> Clone for SafeQueue<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
}
}
}
impl<T> SafeQueue<T> {
// Create a safe queue
// The VEC of the queue implements send, Sync Trait
// and wrapped by Mutex
fn new() -> SafeQueue<T> {
SafeQueue {
queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn empty(&self) -> bool {
let queue = self.queue.lock().unwrap();
queue.is_empty()
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
}
fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop()
}
}
/// test case for String
fn test_string_queue() {
// Create a shared queue to store strings and convert the shared queue to Arc smart Pointers
let queue = SafeQueue::<String>::new();
// Create a child thread. We use move here. Since our queue is Arc, the move is actually a clone
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender1:".to_owned() + &i.to_string());
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push("Send from sender2:".to_owned() + &i.to_string());
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
println!("Get From Thread {:?}", queue_clone.pop());
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
// Sending data to the queue in the main thread
queue.push("Send from main:".to_string() + &num.to_string());
num += 1;
}
if queue.empty() {
break;
}
println!("Get From main {:?}", queue.pop());
}
}
/// test case for dyn FnOnce
fn test_fn_once_queue() {
let queue = SafeQueue::<Box<dyn FnOnce() + Send + Sync>>::new();
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender1:{}", &i.to_string())));
}
});
let queue_clone = queue.clone();
thread::spawn(move || {
for i in 0..100 {
queue_clone.push(Box::new(move || println!("Send from sender2:{}", &i.to_string())));
}
});
let mut num = 0;
let queue_clone = queue.clone();
thread::spawn(move || {
loop {
if !queue_clone.empty() {
print!("Receive from thread:{:?} , ", (queue_clone.pop().unwrap())());
}
thread::sleep(time::Duration::from_millis(1));
}
});
loop {
thread::sleep(time::Duration::from_millis(1));
if num < 100 {
queue.push(Box::new(move || println!("Send from main:{}", &num.to_string())));
num += 1;
}
if queue.empty() {
break;
}
if !queue.empty() {
print!("Receive from main:{:?} , ", (queue.pop().unwrap())());
}
}
}
fn main() {
test_string_queue();
test_fn_once_queue();
}
i try that,but i use Box<dyn FnOnce()+Send+Sync>,that will compile error
The current result is very in line with my expectations
Oh, that is interesting! Yes, the manual implementation is a little less concerned with what T
is (and rightly so). Derive macros unfortunately cannot peek inside and see that it's just an Arc
.