How do I pass these references to a function that spawns a tokio task?

I have a bit of code that works exactly as intended:

pub async fn start(num_websockets: i32) {
    let messages_received = Arc::new(Mutex::new(0));
    let (shutdown_signal, _) = broadcast::channel(1);
    let (done, mut wait_for_tasks) = channel::<&str>(1);
    tokio::spawn(stats::message_counter(
        Arc::clone(&messages_received),
        shutdown_signal.subscribe(),
        done.clone(),
    ));
    for _i in 0..num_websockets {
        let (ws_stream, _) = websocket::connect().await.unwrap();
        let (mut write, read) = ws_stream.split();
        subscription::subscribe(&mut write, "book.BTC-PERPETUAL.100ms")
            .await
            .unwrap();
        tokio::spawn(websocket::listener(
            read,
            messages_received.clone(),
            shutdown_signal.subscribe(),
            done.clone(),
        ));
    }

In the for loop, I spawn an arbitrary amount of websocket::listener, that will shutdown when receiving the shutdown_signal and let main know when they're done through done.

When I try to extract this code to a function, like so:

...
    for _i in 0..num_websockets {
        subscribe_and_listen(messages_received.clone(), shutdown_signal.subscribe(), done.clone());
    }
...

async fn subscribe_and_listen(messages_received: Arc<Mutex<i32>>,
                              shutdown_signal: broadcast::Receiver<&str>,
                              done: mpsc::Sender<&str>) {
    let (ws_stream, _) = websocket::connect().await.unwrap();
    let (mut write, read) = ws_stream.split();
    subscription::subscribe(&mut write, "book.BTC-PERPETUAL.100ms")
        .await
        .unwrap();
    tokio::spawn(
        websocket::listener(
            read,
            messages_received.clone(),
            shutdown_signal,
            done.clone(),
        )
    );
}

The compiler tells me:

error[E0759]: `shutdown_signal` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/benchmark.rs:21:9
    |
13  |                                 shutdown_signal: broadcast::Receiver<&str>,
    |                                                  ------------------------- this data with an anonymous lifetime `'_`...
...
21  | /         websocket::listener(
22  | |             read,
23  | |             messages_received.clone(),
24  | |             shutdown_signal,
    | |             --------------- ...is used here...
25  | |             done.clone(),
26  | |         )
    | |_________^
    |
note: ...and is required to live as long as `'static` here
   --> src/benchmark.rs:20:5
    |
20  |     tokio::spawn(
    |     ^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
   --> /Users/b0nes/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.0/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,

I have tried many things, but I simply cannot move this bit of code to a function, and I really want to understand how to do this. It seems to me that there is no need for anything to be 'static, as the function can take full ownership over the results of shutdown_signal.subscribe() and done.clone().

In any case, I didn't explicitly make anything 'static in the for loop example, which works.

Nobody has answered, so I will make a suggestion. Avoid references when passing data to Async tasks. References are too temporary. You need to pass info by moving it, not by reference. Arc is part of this. I think. I am not an expert, this is more my personal recipe for getting async to work.

[ Disclaimer: This may be all wrong... I am not expert ]

1 Like

@geebee22 Is right. You cannot use reference types as the item type of a channel. Consider using a String.

The reason it works without the function boundary is that the compiler uses the type &'static str rather than &'a str with some other lifetime. You could do this too, but you can only do this if all messages you are going to send are hard-coded string literals somewhere in your source code.

1 Like

Wow... I could not tell from the compiler messages that it was related to what I was actually sending on the channel. Usually, the compiler is very clear... but I guess not if you are suffering from Lack Of Knowledge.

I changed both shutdown_signal and done to have the () type, which I should've done from the start as I only use them for signalling and not for passing data, and now everything compiles correctly!

Based on the compiler message, it seems that it was tokio::spawn that had issues with stuff not satisfying 'static lifetime requirements. Does this mean that only references need to satisfy lifetime 'static? Because I presume that the other arguments I pass to the function are not 'static as they don't live forever?

And just for my future programming's sake: How do I figure out myself which things I can pass to a tokio::task or channel safely? Again, is it just a reference (&) that won't work?

It's important to understand that "the type is 'static" does not mean "values of this type live forever". The real meaning is "no matter how long you hold on to values of this type, they will never contain dangling references".

If a type is not annotated with lifetimes (other than 'static), then it is always 'static.

1 Like

So, a String satisfies 'static in this case and works with tokio::spawn, because the new owners of the String (the function and then the task) are guaranteed that there will be no dangling reference to the underlying data in the heap? In other words, owned types satisfy 'static and borrowed 'static values as well?

I hope I'm close enough for now. Thanks a lot alice.

p.s.: I also found this, which seems like a good explanation for my misconceptions: rust-blog/common-rust-lifetime-misconceptions.md at master · pretzelhammer/rust-blog · GitHub

Exactly. If you own the string, then the string data pointer cannot become dangling while you still own it.