A reference is captured by a future, but why?

Hello everyone. Could you please help me understand what's going on with my code.

So, I have some kind of 'Sender' trait:

trait Sender: Send + 'static {
    fn send(&self, s: String) -> impl Future<Output = ()> + Send;
}

A Sender object is then passed to tokio::spawn, and then sender.send(s).await is called in a loop.
All good so far.
But then I want to encapsulate the call to send in a function:

async fn send<S: Sender>(sender: &S, s: String) {
    // Some additional logic will be here.
    sender.send(s).await;
}

and call send(&sender, s).await inside that loop instead. And this is where the compiler throws a tantrum:

"future cannot be sent between threads safely ... captured value is not Send because & references cannot be sent unless their referent is Sync",

and suggests making Sender Sync (which is not something I want to do).
Apparently, a reference to Sender is now captured by the corresponding future, but I can't undestand why it is captured in this case and not captured if I call sender.send(s).await directly. I mean, the latter is also a function call where self is passed by reference. Why isn't it captured then?
I also wonder if there a reasonable workaround for this. All I can think of for now is passing Sender to send by value and returning it back.

Here is the full example: Rust Playground

Thanks in advance.

If you have an async fn foo(arguments…) {} then arguments… are always captured by the future this produces. The future that an async fn generates is always such that when you call the async fn initially nothing happens, and it only starts executing when the future is awaited; so all arguments need to be captured.

As a consequence, even if you comment out the actual to Sender::send call completely, your code still doesn’t work if this async fn send is used.

async fn send<S: Sender>(sender: &S, s: String) {
    // sender.send(s).await;
}

fn send(&self, s: String) -> impl Future<Output = ()> can be different than this. It’s the same signature as an async fn, but doesn’t force the implementor to have its whole body wrapped in an async { … } block [that’s what’s actually creating this “do nothing until awaited” thing].

This is why fn send(&self, s: String) -> impl Future<Output = ()> + Send can make sense in the first place: an implementation (with a !Sync Sender) can be made work by first converting the &self into something that is actually Send, then wrapping only that into a Future. Calling this send function must be doing a little bit more work immediately when called, not just wrap the arguments and defer all computation until the await starts.


Now, if // Some additional logic will be here. will include additional logic including .await usage, you’re probably out of luck. Let’s see why with some tests in the calling function.

As you noted, this works

async fn run<S: Sender>(sender: S, mut ch_receiver: mpsc::Receiver<String>) {
    loop {
        match ch_receiver.recv().await {
            Some(s) => {
                // This works
                sender.send(s).await;
            }
            None => {
                break;
            }
        }
    }
}

and doesn’t need to capture any &S because it owns the sender: S. Calling sender.send(s) creates a future from the &sender reference. The reference is then no longer needed and the compiler understands that. Only after that, the .await starts.

This similarly, it works with

Some(s) => {
    // Some additional logic *with* awaits
    async {}.await;
    // This works
    sender.send(s).await;
}

This works because the borrow of sender is only created after the previous .awaits are all done. But if instead you do:

Some(s) => {
    let sender_ref = &sender;
    // Some additional logic *with* awaits
    async {}.await;
    // This doesn't work anymore now
    sender_ref.send(s).await;
}

then the compiler complains again. Once again, the call to .send is actually irrelevant:

Some(s) => {
    let sender_ref = &sender;
    // Some additional logic *with* awaits
    async {}.await;
    // This doesn't work either
    sender_ref;
}

Thus, as you can see, if it isn’t even possible inline to do additional logic with .awaits while the &sender reference already exists, abstracting this into a function with a sender: &S argument of course is impossible, too.


Now… if additional logic does not need any .await though, then you can solve the issue; as first laid out above, you can use a fn … -> impl Future approach, and implement it in a way that does any prior additional logic eagerly, then return the future without capturing the &sender:

fn send<S: Sender>(sender: &S, s: String) -> impl Future<Output = ()> + use<'_, S> {
    // Some additional logic; eagerly executed, can't use .await
    // …
    
    // finally, just return the future without additional `async { ….await }` wrapping
    sender.send(s)
}

(playground)

5 Likes

I also had similar problem back then, I didn't find any solution for this and gave up

Slightly more convenient to use, but into the same direction, as a possible workaround when you have exclusive access [as seems to be the case here] you can use &mut S to avoid the Sync requirement, too.

2 Likes

steffahn, thanks a lot for your answers, things are getting clearer now.

Calling sender.send(s) creates a future from the &sender reference. The reference is then no longer needed and the compiler understands that. Only after that, the .await starts.

Yeah, having looked at it again I can now see that it has to work because the future returned by Sender::send is already marked as Send, so any violation of this requirement is a problem of the implementation.
However, I still don't understand why I can implement Sender::send via an async function:

struct SenderImpl {
    another_ch_sender: mpsc::Sender<String>,
}

impl Sender for SenderImpl {
    async fn send(&self, s: String) {
        // Some addtional async call
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        let _ = self.another_ch_sender.send(s).await;
    }
}

The above compiles fine, so the future returned by this send is Send. It looks like self is getting some special treatment from the compiler. But then I tried rewriting Sender so that there is no self:

trait Sender: Send + 'static {
    fn send(this: &Self, s: String) -> impl Future<Output = ()> + Send;
}

and it still works:

impl Sender for SenderImpl {
    // How is this different from "send" above?
    async fn send(this: &Self, s: String) {
        tokio::time::sleep(Duration::from_millis(100)).await;
        let _ = this.another_ch_sender.send(s).await;
    }
}

playground

That works because SenderImpl is Sync.

1 Like

Gosh you are right, somehow I missed that. Thanks again.