Unexpected drop (in async closure/block)

Given following example:

// Cargo.toml

[package]
name = "unexpected-drop"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.28"
tokio = { version = "1.30.0", features = ["macros", "rt-multi-thread", "rt"] }

// main.rs

use {
    futures::{
        channel::mpsc::{unbounded, UnboundedReceiver},
        stream, SinkExt, StreamExt,
    },
    std::iter::once,
};

struct Data {
    pub receiver: UnboundedReceiver<usize>,
    pub entity: usize,
}

impl Drop for Data {
    fn drop(&mut self) {
        println!("Dropping Data");
    }
}

#[tokio::main]
async fn main() {
    let (sender, receiver) = unbounded();
    let entity = 0;
    let data = Data { receiver, entity };

    stream::iter(once(data))
        .zip(stream::iter(once(sender)))
        .for_each(|(/*mut*/ data, mut sender)| async move {
            // let data_fix = data;

            println!(
                "Data entity: {}",
                data /*_fix*/
                    .entity
            );
            assert!(sender.send(1).await.is_ok());

            // let maybe_value = data.receiver.next().await;
            // assert!(maybe_value.is_some());
            // assert_eq!(maybe_value.unwrap(), 1);
        })
        .await;
}

This results in the following output:

Dropping Data
Data entity: 0
thread 'main' panicked at 'assertion failed: sender.send(1).await.is_ok()', src/main.rs:37:17

I tried this with the following rustc versions (output of cargo rustc -- --version):

rustc 1.71.1 (eb26296b5 2023-08-03)
rustc 1.73.0-nightly (08d00b40a 2023-08-09)

I would have expected data not to be dropped before the end of the closure (or block? I am not 100% sure of the terminology here) and sender.send to be successful, even if I am not consuming the result via data.receiver. Even more confusingly, I seem to be able to use data.entity just fine even though data is dropped.

Uncommenting data_fix and using it instead of data fixes the issue. Another option is actually using data.receiver by uncommenting the lower 3 lines (and making data mut).

Glimpsing over the suggested forum threads and the rust references entries for block- and closure expressions didn’t yield any insight for me.

What am I missing here?

I think what's happening is that async move is copying data.entity only, which means the actual data can be dropped early. This is located after the async block, but since the async block doesn't execute until later, this will happen before the print.

So when you call .await on the for_each:

  • The two once iterators are advanced
  • The closure is executed on the resulting tuple, returning a Future corresponding to the async block.
  • At the end of this closure, data is dropped. The future contains a copy of data.entity and a move of sender.
  • The future is awaited, which runs the code contained inside.

If you replace the closure with this:

.for_each(|(data, mut sender)| {
    println!("Before async block");
    let block = async move {
        println!("Data entity: {}", data.entity);
        assert!(sender.send(1).await.is_ok());
    };
    println!("After async block");
    block
})

It will print

Before async block
After async block
Dropping Data
Data entity: 0
thread 'main' panicked ...

When you use data anywhere in the async block without the member access, the async block will then take ownership of the entire data, which will be dropped at the end of the async block.

2 Likes

Thank you for the explanation. Writing it this way makes it more clear to me that for_each stores the future generated by the closure until awaiting. The future storing everything it needs and the rest being dropped at the end of the closure.

I also assume that a partial move occurs here (and with entity being usize it extends to some form of ‘partial copy’).

Withal, I consider the question answered.