Thread delivers - how to consume in main?

Stupid title, I know.... Sorry...

After a longer absence from Rust I was trying to approach again and ported a little Python OpenCV script successfully. It reads from the cam and does some computations.

Now I was dealing with an RTSP source instead. This source reacts bitchy, if the frames are not captured "in time". Long story short: My Python script did solve that by putting the OpenCV cam_read process on a separate thread and sharing the frames with the background main process, secured by a mutex.

It doesn't matter if the source spits faster as the background is able to consume. Those frames are simply lost then.

I was trying to achieve the same. It was easy to do the cam_read on a separate thread. Now just the sharing of the OpenCV::Mat is a bit difficult.

I tried mpsc first, but this was too slow for a full HD OpenCV Mat stream of 30fps. I'm now with crossbeam-channels but this is still too slow.

My idea would be to have a "global" mutable Mat, which is updated from the thread and a flag, which indicates that there is a new frame. Then, in the background, this flag is observed and the frame read on the other thread is quickly cloned to not stop the thread for too long.

Would one of you have a pointer, how to generally achieve that? I already tired Arc<Mutex<Mat>> to no avail so far...

1 Like

I find it very hard to believe that an mpsc channel is your bottleneck when passing messages at such a slow rate as 60 per second especially given that the data passed over the channel is a single pointer. Have you tried running your code with the receiver just dropping the data?

7 Likes

You would probably want to have a Condvar with the Mutex. I've written a little abstraction with a demo here:

pub mod small_channel {
    use std::error::Error;
    use std::sync::{Arc, Condvar, Mutex};
    use std::{fmt, mem};

    #[derive(Debug)]
    enum State<T> {
        Some(T),
        None,
        Disconnected,
    }

    impl<T> State<T> {
        fn is_none(&self) -> bool {
            matches!(self, State::None)
        }

        fn test_connected(&self) -> Result<(), ChannelClosedError> {
            match self {
                State::Disconnected => Err(ChannelClosedError),
                _ => Ok(()),
            }
        }

        fn take_and_unwrap(&mut self) -> T {
            match mem::replace(self, State::None) {
                State::Some(val) => val,
                _ => unreachable!(),
            }
        }
    }

    #[derive(Debug)]
    struct SharedData<T> {
        cond_var: Condvar,
        data: Mutex<State<T>>,
    }

    #[derive(Debug)]
    struct Shared<T>(Arc<SharedData<T>>);

    impl<T> Drop for Shared<T> {
        fn drop(&mut self) {
            let old = mem::replace(&mut *self.0.data.lock().unwrap(), State::Disconnected);
            self.0.cond_var.notify_all();
            drop(old); // drop potential payload only after unlocking
        }
    }

    #[derive(Debug)]
    pub struct Sender<T>(Shared<T>);
    #[derive(Debug)]
    pub struct Receiver<T>(Shared<T>);

    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
        let shared_data = Arc::new(SharedData {
            cond_var: Condvar::new(),
            data: Mutex::new(State::None),
        });

        (
            Sender(Shared(Arc::clone(&shared_data))),
            Receiver(Shared(shared_data)),
        )
    }

    #[derive(Debug)]
    pub struct ChannelClosedError;

    impl fmt::Display for ChannelClosedError {
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
            f.write_str("usage of a closed channel")
        }
    }

    impl Error for ChannelClosedError {}

    impl<T> Sender<T> {
        /// Puts a new value into the channel; discards old / existing value if it wasn't consumed
        #[rustfmt::skip]
        pub fn send(&self, val: T) -> Result<(), ChannelClosedError> {
            let mut data = self.0.0.data.lock().unwrap();
            data.test_connected()?;
            let old = mem::replace(&mut *data, State::Some(val));
            if old.is_none() { // only if it's None, a receiver might be waiting
                self.0.0.cond_var.notify_one();
            }
            drop(data); // unlock,
            drop(old); // then drop old value
            Ok(())
        }
    }

    impl<T> Receiver<T> {
        /// Gets the current value from the channel if one is present, or blocks and waits for a new one.
        #[rustfmt::skip]
        pub fn recv(&self) -> Result<T, ChannelClosedError> {
            let mut data = self.0.0.cond_var
                .wait_while(self.0.0.data.lock().unwrap(), |data| data.is_none())
                .unwrap();
            data.test_connected()?;
            Ok(data.take_and_unwrap())
        }
    }
}

fn main() {
    use std::thread;
    use std::time::Duration;

    let (tx, rc) = small_channel::channel();
    thread::spawn(move || {
        while let Ok(v) = rc.recv() {
            println!("{}", v)
        }

        println!("shut down second thread successfully");
    });

    tx.send(42).unwrap();

    thread::sleep(Duration::from_millis(100));

    tx.send(1337).unwrap();

    thread::sleep(Duration::from_millis(100));

    println!("sending fast, expecting lots of skips");
    for i in 0..=1_000_000 {
        tx.send(i).unwrap();
    }

    println!("shutting down...");
    drop(tx);

    thread::sleep(Duration::from_millis(100));
}

Rust Playground

Here is the code with mpsc. If the "heavy processing" part is commented, the display is fine and the frame rate is kept. Otherwise the camera display is kind of slow motion. The resolution is 1280x720.

Similar but slightly better timing with crossbeam-channels.

I think it is because there is kind of sync between tx and rx and the communication is not lossy...

Probably it would be a better way to move the cam.read to the main loop and the processing into the thread. Will try that.

    let (tx, rx) = channel();

    thread::spawn(move || {
        loop {
            let mut frame = Mat::default();
            if cam.read(&mut frame).is_err() {
                continue;
            }
            tx.send(frame).unwrap();
        }
    });

   loop {

      let frame = rx.recv().unwrap();

      // Heavy processing of  the frame
      // ...
      // ...
      
      highgui::imshow("Camera", &frame)?;
      let key = highgui::wait_key(1)?;
      if key != -1 {
          break;
      }
   }

Hi @steffan,

Thanks for this. It seems to work great, even though it looks pretty complicated to me. Will try to wrap my head around.

Thanks

EDIT: If I get that right, the flow is the reverse of my target approach, but the similar problem. Here the foreground thread is the "producer" and the background is the "receiver". I suppose the "discard old not consumed values" is the key.

If you want to drop frames when processing is too slow, you could try something like this:

let frame = rx.try_iter().last().unwrap_or_else(|| rx.recv().unwrap());

(Untested; may need to be rearranged some to satisfy the borrow checker)

2 Likes

This works very well! Thanks for the hint. The latency is a bit higher than with @stefan's solution (and even higher than with the Python solution), but at least it is OK.

@steffahn Works perfectly.

@all: Sorry, I need to toggle my network between the Internet and an Anafi drone (the RTSP source). This leads to some confusion while posting here in parallel...

Anyway. I have two working solutions thanks to the helpful ghosts here :slight_smile: Good job.

I didn't contribute here. I guess you wanted to highlight @steffahn?

Yepp. Sorry...

Another approach would be to use sync_channel() and then use tx.try_send().ok(); to only send frame data if the processor has already finished with the previous frame. That avoids the memory bloat of having the channel buffer vast numbers of frames that will never be used.

2 Likes

That, indeed, works perfectly too :slight_smile: Thanks for sharing

Pretty sure you simply want to use a bounded channel with try_send, this returns the message to the caller in an Err for them to deal with if the buffer is full.

1 Like

Though note, unlike my and the other answers using channels, @steffahn has one that will replace the item in the buffer if the sender "laps" the receiver, meaning the reader always gets the latest value.

There's a variety of crates available for this particular pattern that have taken the effort to get even lower latency, for example: https://crates.io/crates/triple_buffer

1 Like

Thanks. ATM I'm having problems to make triple_buffer work with OpenCV::Mat. All attempts end up with "Bus error 10"

No Warranty provided - I simply picked that off plausible looking results under spsc for single-producer-single-consumer. That seemed to have a bunch of decent results.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.