Move and not move at the same time

I want implement push/pull strategy for integration with external API:
notify about new data and then wait confirmation of data handling before continue
processing.

The problem that in one closure I need moving msg variable and at the same time
get notify by reference.
So I need comment either
println!("msg: {}", msg); (and remove move) or notify.notified().await;,
is any way to tell rustc that I need notify one of variable in closure by reference
and another one by value?

use futures_util::{pin_mut, stream::StreamExt};

use tokio::{
    stream::{self},
    sync::{oneshot, Notify},
};

#[tokio::main]
async fn main() {
    let mut stream1 = stream::iter(vec![1, 2, 3]);

    let (_stop_read, time_to_stop): (oneshot::Sender<()>, _) = oneshot::channel();
    let mut notify = Notify::new();
    pin_mut!(notify);

    let reader = stream1.for_each(|msg| async move {
        //report new data
        println!("msg: {}", msg);
        notify.notified().await;
    });
    tokio::select! {
        _ = reader => println!("read"),
        _ = time_to_stop => println!("time to stop"),
    };
}

You can make notify an explicit reference, so move only moves that reference.

You can do this:

tokio::select! {
    Some(msg) = stream1.next() => {
        println!("msg: {}", msg);
        notify.notified().await;
    },
    _ = time_to_stop => println!("time to stop"),
};

playground

   tokio::select! {
    Some(msg) = stream1.next() => {
        println!("msg: {}", msg);
        notify.notified().await;
    },

But in this case I recieve only one message from stream ?

Right, you would usually put it in a loop in that case:

loop {
    tokio::select! {
        Some(msg) = stream1.next() => {
            println!("msg: {}", msg);
            notify.notified().await;
        },
        _ = &mut time_to_stop => {
            println!("time to stop");
            break;
        },
    };
}

playground