Flume question / possible bug

Hi,

I am trying to use flume in a toy project of mine and I'm seeing some odd behaviour. A summary is:

  1. Create 3 send/rx pair incoming to processing-thread
  2. Create another pair for an ack channel
  3. Spin up a processing thread and a "user" thread
  4. Send a,a,b,a from user thread with some sleeps between
  5. See on processing thread a,a,a,b with the sleeps showing that it is blocking waiting on the third a

This seems odd to me. What am I doing wrong?

Trivial example below, running on nightly:

Cargo.toml:

[dependencies]
flume = { version = "0.10.1" }

main.rs

#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(dead_code)]

// use minalog::*;

use std::env;
use std::fs;
use flume::{Receiver, Sender, SendError, Selector};
use std::thread;
use std::thread::sleep;
use std::time::{Instant, Duration};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};


fn main() {
    let (a_send, a_rx) = flume::unbounded::<u8>();
    let (b_send, b_rx) = flume::unbounded::<u8>();
    let (c_send, c_rx) = flume::unbounded::<u8>();
    let (processed_send, processed_rx) = flume::unbounded::<u8>();
    let last_seen = Arc::new(Mutex::new(Instant::now()));

    let a_running = Arc::new(AtomicBool::new(true));
    thread::spawn(move || {
        let done = |name| {
            processed_send.send(1).unwrap();
            name
        };

        loop {
            let sel = Selector::new().
                recv(&a_rx, |msg| {
                    let ty = "a";
                    let since = {
                        let mut last_seen = last_seen.lock().unwrap();
                        let now = Instant::now();
                        let s = now.duration_since(*last_seen);
                        *last_seen = now;
                        s.as_millis()
                    };
                    println!("{} Got incoming of {:?}, took {:?}ms", ty, msg, since);
                    done(ty)
                }).
                recv(&b_rx, |msg| {
                    let ty = "b";
                    let since = {
                        let mut last_seen = last_seen.lock().unwrap();
                        let now = Instant::now();
                        let s = now.duration_since(*last_seen);
                        *last_seen = now;
                        s.as_millis()
                    };
                    println!("{} Got incoming of {:?}, took {:?}ms", ty, msg, since);
                    done(ty)
                }).
                recv(&c_rx, |msg| {
                    println!("Should never get here");
                    done("c")
                });

            match sel.wait_timeout(Duration::from_millis(300)) {
                Ok(event) => {
                    println!("INFO Just finished event {}", event);
                }
                Err(e) => {
                    println!("WARN Got nothing for a while in main-loop, this should be impossible");
                }
            }
        }
    });

    thread::spawn(move || {
        println!("Starting sender");
        for _ in 1..10 {
            a_send.send(0).unwrap();
            sleep(Duration::from_millis(20));
            a_send.send(1).unwrap();
            sleep(Duration::from_millis(20));
            b_send.send(2).unwrap();
            sleep(Duration::from_millis(200));
            a_send.send(3).unwrap();
            sleep(Duration::from_millis(20));
        }
        println!("Done sender");
        sleep(Duration::from_secs(5));
    });

    sleep(Duration::from_secs(5));
    println!("Done");
}

rust versions etc


$ rustup show
Default host: x86_64-pc-windows-msvc
rustup home:  C:\Users\dave\.rustup

installed toolchains
--------------------

stable-x86_64-pc-windows-msvc
nightly-x86_64-pc-windows-msvc (default)

active toolchain
----------------

nightly-x86_64-pc-windows-msvc (default)
rustc 1.51.0-nightly (04caa632d 2021-01-30)

Running your example with some debug logging, it looks like whenever the selector thread gets woken up by a "b" message, it hits this "spurious wakeup" case and goes back to sleep until woken by the next "a" message: select.rs - source

I haven't figured out why the token isn't set on these wakeups.

There is a bug in flume’s select, not sure if this is the same thing though: https://github.com/zesterer/flume/issues/44

More info: After each successful message is handled, the loop in your select-receive threat creates a new Selector. However, for some reason, the tokens from "b" messages are pushed to a queue owned by a selector from an earlier iteration of the loop (which has since been destroyed).

This appears to happen because old hooks are not successfully removed in RecvSelection::deinit. I suspect this is because it is comparing the addresses of trait object pointers using Arc::ptr_eq, and running into this issue: https://github.com/rust-lang/rust/issues/46139

I submitted a fix for this here:

https://github.com/zesterer/flume/pull/69

Edit: For now, you can also work around this bug by setting codegen-units = 1 in your Cargo.toml. For example:

[profile.dev]
codegen-units = 1

[profile.release]
codegen-units = 1
1 Like

Hi,

Is creating a new Selector each time around the loop not the expected usage/pattern? I am pretty new to rust and was thinking that cos the wait methods take self, then they are consuming the Selector so I have to make new ones. Is there a different way I should be doing this?

Your code is using Selector correctly, as far as I know. This is simply a bug in the flume library, triggered by a bug or weird corner case in the Rust compiler.

Update: flume 0.10.2 has just been released with the fix included. To upgrade to the fixed version, run cargo update inside your project directory. Thanks @zesterer for the quick review!

1 Like

awesome, just tried it and seems all good now. Thanks for the quick turnaround all.

2 Likes

To avoid similar issues in the future, I think that Arc::ptr_eq should require Self: Sized to avoid people accidentally falling into this trap. To my knowledge, there's no real case in which the current behaviour with trait objects would be desirable (correct me if I'm wrong). This would be a breaking change, but breakages would only be highlighting likely bugs.

Trait objects are not the only usage of the DST. It's still useful to compare Arc<str>s with it.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.