I had a bug where a futures mpsc::Sender
was dropped too early inside a generator.
I managed to create a small example demonstrating the problem.
The code below contains two execution tasks that run simultaneously:
looper()
takes two receivers, and handles incoming messages from the two receivers.
Currently it prints the string "Event" for every incoming message from one of the receivers.
If any receiver is closed, looper
exits with an error.
The other execution task is runner()
. It sends messages to looper()
, but only through one channel sender (sa
). It doesn't use the other channel sender (sb
).
Surprisingly, it seems like sb
is being dropped very early, which causes looper()
to exit with an error, and causes sa.send(())
inside runner()
to fail, because all the receiving sides of the channels where dropped.
#![feature(generators)]
extern crate futures_await as futures;
extern crate tokio_core;
use futures::{Sink, Stream, stream};
use futures::prelude::{async, await};
use futures::sync::mpsc;
use tokio_core::reactor::Core;
#[async]
fn looper(ra: mpsc::Receiver<()>,
rb: mpsc::Receiver<()>) -> Result<(),()> {
let ra = ra
.map_err(|_| ())
.chain(stream::once(Err(())));
let rb = rb
.map_err(|_| ())
.chain(stream::once(Err(())));
#[async]
for _event in ra.select(rb) {
println!("Event");
}
Ok(())
}
#[async]
fn runner(mut sa: mpsc::Sender<()>,
sb: mpsc::Sender<()>) -> Result<(),()> {
for _ in 0 .. 8usize {
sa = await!(sa.send(())).unwrap();
}
// drop(sb); // <-- If uncommented, everything works fine.
Ok(())
}
fn main() {
let (sa, ra) = mpsc::channel::<()>(0);
let (sb, rb) = mpsc::channel::<()>(0);
let mut core = Core::new().unwrap();
let handle = core.handle();
handle.spawn(looper(ra,rb));
core.run(runner(sa,sb)).unwrap();
}
My Cargo.toml:
[package]
name = "early_drop_channel"
version = "0.1.0"
authors = ["real"]
[dependencies]
futures = "0.1"
tokio-core = "0.1"
futures-await = "0.1"
Output:
Event
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', libcore/result.rs:1009:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.
The failure happens at the line sa = await!(sa.send(())).unwrap();
I noticed that if I add drop(sb)
to the end of runner()
, sb
is not dropped early, and the code run as I would expect.
Output when adding the drop(sb)
line:
Event
Event
Event
Event
Event
Event
Event
Event
Is this behaviour the usual in Rust? Are arguments dropped as early as possible?