Sender implicitly dropped inside a generator

I had a bug where a futures mpsc::Sender was dropped too early inside a generator.
I managed to create a small example demonstrating the problem.

The code below contains two execution tasks that run simultaneously:
looper() takes two receivers, and handles incoming messages from the two receivers.
Currently it prints the string “Event” for every incoming message from one of the receivers.
If any receiver is closed, looper exits with an error.

The other execution task is runner(). It sends messages to looper(), but only through one channel sender (sa). It doesn’t use the other channel sender (sb).

Surprisingly, it seems like sb is being dropped very early, which causes looper() to exit with an error, and causes sa.send(()) inside runner() to fail, because all the receiving sides of the channels where dropped.

#![feature(generators)]
extern crate futures_await as futures;
extern crate tokio_core;

use futures::{Sink, Stream, stream};
use futures::prelude::{async, await};
use futures::sync::mpsc;
use tokio_core::reactor::Core;

#[async]
fn looper(ra: mpsc::Receiver<()>,
          rb: mpsc::Receiver<()>) -> Result<(),()> {

    let ra = ra
        .map_err(|_| ())
        .chain(stream::once(Err(())));

    let rb = rb
        .map_err(|_| ())
        .chain(stream::once(Err(())));

    #[async]
    for _event in ra.select(rb) {
        println!("Event");
    }

    Ok(())
}

#[async]
fn runner(mut sa: mpsc::Sender<()>,
          sb: mpsc::Sender<()>) -> Result<(),()> {
    for _ in 0 .. 8usize {
        sa = await!(sa.send(())).unwrap();
    }
    // drop(sb); // <-- If uncommented, everything works fine.
    Ok(())
}

fn main() {
    let (sa, ra) = mpsc::channel::<()>(0);
    let (sb, rb) = mpsc::channel::<()>(0);

    let mut core = Core::new().unwrap();
    let handle = core.handle();
    handle.spawn(looper(ra,rb));

    core.run(runner(sa,sb)).unwrap();
}

My Cargo.toml:

[package]
name = "early_drop_channel"
version = "0.1.0"
authors = ["real"]

[dependencies]
futures = "0.1"
tokio-core = "0.1"
futures-await = "0.1"

Output:

Event
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', libcore/result.rs:1009:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

The failure happens at the line sa = await!(sa.send(())).unwrap();
I noticed that if I add drop(sb) to the end of runner(), sb is not dropped early, and the code run as I would expect.

Output when adding the drop(sb) line:

Event
Event
Event
Event
Event
Event
Event
Event

Is this behaviour the usual in Rust? Are arguments dropped as early as possible?

In “normal” Rust values are guaranteed to live until the end of the scope if they are bound. It seems like generators don’t follow this rule according to your example, but my testing (on the playground) seems to show they do:

struct Bar(u32);

impl Drop for Bar {
    fn drop(&mut self) {
        println!("Dropping {}", self.0);
    }
}

fn foo() -> impl Generator<Yield = (), Return = ()> {
    static || {
        println!("before 1");
        let _bar1 = Bar(1);
        println!("after 1");
        yield;
        println!("before 2");
        let _bar2 = Bar(2);
        println!("after 2");
        yield;
        println!("before end");
    }
}

fn main() {
    let mut f = foo();
    println!("{:?}", unsafe { f.resume() });
    println!("{:?}", unsafe { f.resume() });
    println!("{:?}", unsafe { f.resume() });
}

gives

before 1
after 1
Yielded(())
before 2
after 2
Yielded(())
before end
Dropping 2
Dropping 1
Complete(())

I wonder if there’s something else the #[async] macro is doing that affects this behaviour (it is taking in arguments, but trying out some variations with arguments still doesn’t reproduce what you see).

It does seem reasonable to me that generators would have different behaviour here for optimization purposes. If the variable is not used across yield points then it’s likely better to use the real stack frame of the resume call for it instead of the saved stack frame of the generator (if not for performance, then for memory usage when you have a lot of suspended generators). It’s only non-trivial Drop implementations that could observe this difference.

One question is what version of nightly are you using? This seems like it could have been an oversight of the original generators implementation that was fixed after the builtin async/await support got more users of the feature. I know there have definitely been some bug fixes related to dropping in the last couple of months.

@Nemo157: Thanks for checking this out. I didn’t know one can define Generators this way!
Regarding my nightly version, I’m using the latest:

$ rustc --version
rustc 1.31.0-nightly (fc403ad98 2018-09-30)
$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.08s                                                                                                                                                      
     Running `target/debug/early_drop_channel`
Event
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', libcore/result.rs:1009:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

However, I’m not using the 2018 edition (Remove the line from Cargo.toml). If I turn it on the #[async] macro stops working, because it complains about the async keyword being a reserved keyword.

I would actually be happy to use async as a keyword, is it already working in the 2018 edition with futures?

Oh, the issue is that #[async] is missing something I thought it had. Using cargo expand you can see the resulting generator for runner:

fn runner(
    mut sa: mpsc::Sender<()>,
    sb: mpsc::Sender<()>,
) -> impl ::futures::__rt::MyFuture<Result<(), ()>> + 'static {
    ::futures::__rt::gen(move || -> Result<(), ()> {
        let __e: Result<(), ()> = {
            {
                for _ in 0..8usize {
                    sa = {
                        let mut future = sa.send(());
                        loop {
                            match ::futures::Future::poll(&mut future) {
                                ::futures::__rt::std::result::Result::Ok(
                                    ::futures::Async::Ready(e),
                                ) => break ::futures::__rt::std::result::Result::Ok(e),
                                ::futures::__rt::std::result::Result::Ok(
                                    ::futures::Async::NotReady,
                                ) => {}
                                ::futures::__rt::std::result::Result::Err(e) => {
                                    break ::futures::__rt::std::result::Result::Err(e)
                                }
                            }
                            yield ::futures::Async::NotReady
                        }
                    }.unwrap();
                }
                Ok(())
            }
        };

        #[allow(unreachable_code)]
        {
            return __e;
            loop {
                yield ::futures::Async::NotReady
            }
        }
    })
}

Note that nowhere in here does it move sb into the generator, so it will be dropped before runner even returns. I thought that #[async] put a line like let (mut sa, sb) = (sa, sb); into the start of the generator to avoid issues like this. I’m not sure why you get a single event through before looper notices that sb was dropped, swapping the order they’re polled in with rb.select(ra) causes the crash to happen before the first event so it’s likely somehow related to the select adapter’s implementation.

The simplest fix is to add let sb = sb; to the start of runner to make the sender actually be moved into the generator (or an explicit drop as you noted).

@Nemo157 : Thanks, I didn’t know about cargo expand. The output is pretty interesting!
I think that having a let sb = sb line at the beginning is nicer than having a drop(sb) at the end.
I am not sure if this problem is even worth reporting, as the tokio developers are probably busy with the newer async syntax.

A bit unrelated: Can cargo expand be used to expand the compile errors I sometimes see for #[async]?
I have this problem when working with #[async]: If there is any compile error, the compiler won’t show the exact location of the error. Instead, it will point to the #[async] line. I became better at guessing where the compile error is after a while, but for some compile errors it could take a really long time to figure it out.

I’ve opened rust-lang/rust#54716 to try and have this fixed with the builtin async/await support. futures-await is pretty much in maintenance mode so I wouldn’t expect the existing developers to fix it, but if you (or someone else still using it) feels like fixing it you might be able to get someone to merge and release a new build.

Unfortunately it will mostly not work when there’s compile errors. Depending on where in the compiler the error happens it might be possible, but most of the time cargo expand will also error out (it uses rustc to do all the hard work).