Limiting lifetime of a spawned task

I have a situation that needs to forward an outer stream to a websocket stream inside a loop. Here is my code:

use async_tungstenite::async_std::connect_async;
use async_tungstenite::tungstenite::Message;
use async_tungstenite::WebSocketStream;
use async_tungstenite::async_std::ConnectStream;
use async_std::task;
use futures_util::stream::SplitStream;
use futures_util::stream::SplitSink;
use futures_util::stream::StreamExt;
use futures_util::sink::SinkExt;
use std::time::Duration;
use std::error::Error;

use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};

type WssRecv = SplitStream<WebSocketStream<ConnectStream>>;
type WssSend = SplitSink<WebSocketStream<ConnectStream>, Message>;
type MsgRecv = UnboundedReceiver<Message>;
type MsgSend = UnboundedSender<Message>;

async fn loop_send_msg(wss_send: &mut WssSend, msg_recv: &mut MsgRecv) {
    loop {
        let msg = msg_recv.next().await.unwrap();
        wss_send.send(msg).await;
    }
}

async fn loop_handle_msg(mut wss_recv: WssRecv) -> Result<(), Box<dyn Error>> {
    loop {
        let resp = wss_recv.next().await;
        if resp.is_none() {
            return Ok(()) //should be error but anyway
        }
        println!("{:#?}", resp.unwrap()?);
    }
}

async fn loop_wss(mut msg_recv: MsgRecv) {
    loop {
        let (wss, resp) = connect_async("wss://echo.websocket.org").await.unwrap();
        println!("{:#?}", resp);
        let (mut wss_send, wss_recv) = wss.split();
        let sender_joinhandle = task::spawn(loop_send_msg(&mut wss_send, &mut msg_recv));
        loop_handle_msg(wss_recv).await;
        sender_joinhandle.cancel().await;
    }
}

async fn async_main() {
    let (mut msg_send, msg_recv) = unbounded();
    task::spawn(loop_wss(msg_recv));
    msg_send.send(Message::text("hi")).await.unwrap();
    task::sleep(Duration::from_secs(10)).await;
}

fn main() {
    task::block_on(async_main());
}

and the problem is well explained by the compiler:

error[E0597]: `wss_send` does not live long enough
  --> src/main.rs:38:59
   |
38 |         let sender_joinhandle = task::spawn(loop_send_msg(&mut wss_send, &mut msg_recv));
   |                                             --------------^^^^^^^^^^^^^----------------
   |                                             |             |
   |                                             |             borrowed value does not live long enough
   |                                             argument requires that `wss_send` is borrowed for `'static`
...
41 |     }
   |     - `wss_send` dropped here while still borrowed

error[E0499]: cannot borrow `msg_recv` as mutable more than once at a time
  --> src/main.rs:38:74
   |
38 |         let sender_joinhandle = task::spawn(loop_send_msg(&mut wss_send, &mut msg_recv));
   |                                             -----------------------------^^^^^^^^^^^^^-
   |                                             |                            |
   |                                             |                            mutable borrow starts here in previous iteration of loop
   |                                             argument requires that `msg_recv` is borrowed for `'static`

error: aborting due to 2 previous errors

and now my question is, whether i am able to annotate the lifetime of it will end at cancel() that i called?

You’re not using the arguments again later; is there any reason not to transfer ownership into loop_send_msg?

async fn loop_send_msg(wss_send: WssSend, msg_recv: MsgRecv) {
    loop {
        let msg = msg_recv.next().await.unwrap();
        wss_send.send(msg).await;
    }
}

for example, if the websocket disconnects, it will go down calling the joinhandle.cancel and iterate the loop (in short, reconnect). in that scenario i will need to pipe the msg_recv to new wss_send again, so I need the ownership kept on loop_wss

I have updated my code to reflect that the function listening websocket may exit upon errors or unexpected events

I'm pretty sure you can do all that without any life time specifications. I have just been getting into web sockets with tokio-tunstenite this last week and done something similar. Not a tick mark anywhere.

In general I would suggest that as soon as one starts to think one needs lifetime tick marks, or the compiler suggests such, it is a hint that one is trying to do the wrong thing. One should stop typing immediately, step back from the computer, and have a good long think about ones program structure.

Object life times are not determined by those tick marks. Lifetimes are determined by your program structure, when objects are created, when they are destroyed and how they are passed around. Get that right and no lifetime specifiers are required. Get it wrong and trying to fix it with lifetimes will lead to frustration.

I would post my web socket code as an example but it has become rather long and complex what with other things I do.

A useful trick is that one can pass channel ends around through channels. If one wants to have a temporary web socket connection handler thread communicate with another thread that runs forever, then on can create a channel when the ws connection is accepted and pass the tx end to the long running thread through another channel. The long running thread and then use that new channel to write back to the web socket handler. When the web socket connection dies so does that channel you created for it. The long running thread will know this because send will fail on that channel.

Hmmm.. sorry if that is not so clear an explanation.

4 Likes

When dealing with tasks, everything you send as an argument needs to be owned (i.e. ’static). In your case, consider what would happen if the wss_loop task got cancelled. The loop_send_msg task would then have a reference to an object that no longer exists.

If MsgRecv can be cloned, you can send a clone into the subtask. Otherwise, you can wrap it in an Arc<Mutex<....>> to make something that can be shared between tasks.

Actually what I want to do is just as simple as following:

Make a mpsc channel A, and a websocket channel B.
Forward output of A to input of B.
If B disconnects, reconnect and pipe the message again.

I believe it is an really easy scenario, but as far as most spawns (both thread spawn and async spawn) takes <'static>, i cant think of a way to keep the same consumer end of channel A.

Actually I already have an implementation using mpmc so I can clone a consumer and transfer the ownership to new task, but I wonder whether it can be achieved using mpsc, as it looks totally reasonable to me to have this particular task dropped together with the lifetime of loop. maybe spawn is not a good function here?

Instead of spawning again, consider catching the panic that would kill the task. Or store data in a custom type with a destructor.

In my a web socket efforts:

a) I create a web socket listener, that lives forever. It can accept many incoming web socket connections. For each one of those it spawn's a new task to handle the connection.

b) I create a task is expected to run forever and do the work of the application.

c) I create a channel that lives forever. That channel is used to send from the web socket handlers, to the worker thread. The tx end of this channel can be cloned and passed to every new web socket connection handler. The worker gets the rx end when it is spawned.

d) When an incoming web socket connection is established an new task is spawned to handle it. A new channel is also created. The tx end of that channel is passed to the worker thread in a message through the existing channel to the worker. The worker can now send back to the web socket handler on that new channel.

In this way everything lives for a long as it needs to, and no longer. No lifetime tick marks required.

I still can't explain it well. Hope there is the seed of an idea in there for you.

1 Like

actually in real implementation i handled the unwraps so it would not be a big problem there, im just lazy to include them here as it will make the example even longer :rofl: :rofl: :rofl:

If you handle panics and nothing can kill your task, why do you need to recover from canceled tasks? Reconnect the websocket in the task.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.