Hi,
I am trying to use flume in a toy project of mine and I'm seeing some odd behaviour. A summary is:
- Create 3 send/rx pair incoming to processing-thread
- Create another pair for an ack channel
- Spin up a processing thread and a "user" thread
- Send a,a,b,a from user thread with some sleeps between
- See on processing thread a,a,a,b with the sleeps showing that it is blocking waiting on the third a
This seems odd to me. What am I doing wrong?
Trivial example below, running on nightly:
Cargo.toml:
[dependencies]
flume = { version = "0.10.1" }
main.rs
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(dead_code)]
// use minalog::*;
use std::env;
use std::fs;
use flume::{Receiver, Sender, SendError, Selector};
use std::thread;
use std::thread::sleep;
use std::time::{Instant, Duration};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
fn main() {
let (a_send, a_rx) = flume::unbounded::<u8>();
let (b_send, b_rx) = flume::unbounded::<u8>();
let (c_send, c_rx) = flume::unbounded::<u8>();
let (processed_send, processed_rx) = flume::unbounded::<u8>();
let last_seen = Arc::new(Mutex::new(Instant::now()));
let a_running = Arc::new(AtomicBool::new(true));
thread::spawn(move || {
let done = |name| {
processed_send.send(1).unwrap();
name
};
loop {
let sel = Selector::new().
recv(&a_rx, |msg| {
let ty = "a";
let since = {
let mut last_seen = last_seen.lock().unwrap();
let now = Instant::now();
let s = now.duration_since(*last_seen);
*last_seen = now;
s.as_millis()
};
println!("{} Got incoming of {:?}, took {:?}ms", ty, msg, since);
done(ty)
}).
recv(&b_rx, |msg| {
let ty = "b";
let since = {
let mut last_seen = last_seen.lock().unwrap();
let now = Instant::now();
let s = now.duration_since(*last_seen);
*last_seen = now;
s.as_millis()
};
println!("{} Got incoming of {:?}, took {:?}ms", ty, msg, since);
done(ty)
}).
recv(&c_rx, |msg| {
println!("Should never get here");
done("c")
});
match sel.wait_timeout(Duration::from_millis(300)) {
Ok(event) => {
println!("INFO Just finished event {}", event);
}
Err(e) => {
println!("WARN Got nothing for a while in main-loop, this should be impossible");
}
}
}
});
thread::spawn(move || {
println!("Starting sender");
for _ in 1..10 {
a_send.send(0).unwrap();
sleep(Duration::from_millis(20));
a_send.send(1).unwrap();
sleep(Duration::from_millis(20));
b_send.send(2).unwrap();
sleep(Duration::from_millis(200));
a_send.send(3).unwrap();
sleep(Duration::from_millis(20));
}
println!("Done sender");
sleep(Duration::from_secs(5));
});
sleep(Duration::from_secs(5));
println!("Done");
}
rust versions etc
$ rustup show
Default host: x86_64-pc-windows-msvc
rustup home: C:\Users\dave\.rustup
installed toolchains
--------------------
stable-x86_64-pc-windows-msvc
nightly-x86_64-pc-windows-msvc (default)
active toolchain
----------------
nightly-x86_64-pc-windows-msvc (default)
rustc 1.51.0-nightly (04caa632d 2021-01-30)