Endless Worker Thread


#1

Hey there,

this is my first topic and I’m just struggling on some cases using rust.
I first developed this kind of loops/apps in Java and now want to compare how to implement this case in rust.

Imagine you have this program:

struct Queue {
    items: Vec<&'static str>
}

impl Queue {
    pub fn print(&self) {
        loop {
            for item in &self.items {
                println!("{}", item);
            }
        }
    }
}

pub fn main() {
    let mut q = Queue { items: vec![ "first", "second", "third" ] };
    q.print();
    q.items.push("fourth");
}

How do I thread Queue so that the loop run will not block and “fourth” will be added to the queue and processed later on?

Thank you for any help
Dominik


#2

The way you stated it, it’s almost impossible, as you should own a mutable version of a Queue and modify it, meanwhile you want to share reference to it with another thread. This is what Rust is designed to prevent.

If you want to pass objects to other thread and process them there, you should use channels:

use std::sync::mpsc::channel;
use std::thread;

let (tx, rx) = channel::new();

let _guard = thread::spawn(move || {
  loop {
    if let Ok(msg) = rx.recv() {
       print("{}", msg);
    }
  }
});

let mut q = Queue { items: vec![ "first", "second", "third" ] };
for item in &q.items {
  tx.send(item).unwrap();
}

tx.send("fourth").unwrap();

If you want to pass a mutable ref of a Queue struct, you will have to use Arc and Mutex. You may want to abstract such behavior in Queue struct’s inherent methods.

In other words, inter-thread communication and synchronization is not an easy topic on itself, not to tell when it mixes with shared data access. You should generally prevent such things, and use channels to pass around cloned copies of data to process.


#3

Okay so I found a suitable solution: (for everyone running in the same problem/situation)

use std::sync::mpsc::channel;
use std::thread;

fn main() {
    let (tx, rx) = channel();

    let _guard = thread::spawn(move || {
        let mut items = vec![];
        loop {
            if let Ok(msg) = rx.try_recv() {
                items.push(msg);
            }
            for item in &items {
                println!("iter {}", item);
            }
        }
    });

    let items = vec![ "first", "second", "third" ];
    for item in items {
        println!("sending {}", item);
        tx.send(item).unwrap();
    }

    tx.send("fourth").unwrap();

    _guard.join();
}

#4

This doesn’t look like a queue, as the items are never taken out of the vector in the worker thread. Doesn’t the channel itself serve as a queue?


#5

Yes you’re right, but in my case it should be an endless queue.
So no items should ever be removed :wink:

A real queue should not store the items internally and only processes them till it is finished.


#6

It also seems that the worker thread is going to be busy (eat the CPU) all the time (because try_recv doesn’t block). Is that the intention?


#7

Yes.

Here is a practical example to show what I want to achieve: (It’s pseudo code, based on the previous examples)

use std::sync::mpsc::channel;
use std::thread;

fn main() {
    let (tx, rx) = channel();

    let _guard = thread::spawn(move || {
        let mut crawlQueue = vec![];
        loop {
            if let Ok(crawlEntry) = rx.try_recv() {
                crawlQueue.push(crawlEntry);
            }
            for crawlEntry in &crawlQueue {
                let foundUrls = crawlEntry.crawlNewUrls(); // this could be a heavy operation and block some time
                for foundUrl in foundUrls {
                    crawlQueue.push(CrawlEntry::new(foundUrl));
                }
            } // after the crawling everything begins again, because the crawling should never stop
        }
    });

    tx.send(CrawlEntry::new("http://www.google.com/")).unwrap();

    _guard.join();
}

Maybe you gonna understand my intention now :slight_smile: