Changing self in thread - best practice?

I'm working on a small project and part of it spawns a webserver in a thread (which is killed later on).

I need to set the value of the struct that spawns the thread and I've been using channels for this. Is this considered good or bad practice? Is it common?

Coming from the world of TypeScript where everything is a little looser, I'm looking for some guidance. Cheers!

Channels are a fine tool for concurrent programming, but in order to say whether you're using them appropriately, we'll need to see some code — at least the data structures and the functions that are starting the thread and sending/receiving on the channel.

Context definitely helps... Here's my method -

        let (tx, rx) = channel::unbounded();

        let file_route = warp::get()
            .and(warp::path("callback"))
            .and(warp::path::end())
            // todo: hardcode this in the file and send str
            .and(warp::fs::file("./src/services/index.html"));

        let token_route = warp::post()
            .and(warp::path("token"))
            .and(warp::path::end())
            .and(warp::query::<TokenAuth>())
            .map(move |token: TokenAuth| {
                tx.send(token.access_token).unwrap();
                tx.send("kill".to_string()).unwrap();
                Ok(warp::reply::with_status("OK", http::StatusCode::OK))
            });

        println!("Spawning server for authentication");

        let handlers = token_route.or(file_route);

        let webserver_thread = tokio::spawn(async move {
            spawn(warp::serve(handlers).bind(([127, 0, 0, 1], 3000)))
                .await
                .unwrap();
        });

        let callback_url = self.get_callback_url();
        println!("Go to {} to login", callback_url);

        for msg in rx.recv() {
            match msg.as_str() {
                "kill" => webserver_thread.abort(),
                _ => self.access_token = msg,
            }
        }

    }

It could be cleaned up, I'm sure - but it's my first sort of project with the language. I'm performing an oauth implicit flow using this. Channels used are crossbeam.

Edit: there's a bug in my code where it's accidentally working as desired; calling rx.recv() collects the first value and returns. the kill part is never matched and the thread isn't aborted.

If I am not mistaken, you are trying to implement an OAuth2 workflow.
In that case, channels are a fine way of solving the problem.

1 Like

You can't use blocking channels in async code. Read this for more. Consider using tokio::sync::mpsc instead.

3 Likes

Thanks! That was a good read.

I originally didn't go with tokio threads as they don't implement clone, so I can't use them in my handler.

This code can be blocking, as the oauth flow needs to run before I can do anything else. I had a quick go, but no success... I'll update the thread later if I figure it out


Edit: using tokio::sync::mpsc has clonable channels, but I'm getting a different error this time. closure is 'FnOnce' because it moves the variable tx out of its environment


Second edit: I believe this is because we're moving the sending part of the channel. Removing this leaves me with one final error: type inside async fn body must be known in this context.

A quick snippet of the route:

.and_then(|token: TokenAuth| async {
  tx.send(token.access_token).await?;
  Ok(warp::reply::with_status("OK", http::StatusCode::OK)) // error is here
});

I mean, if you use the unbounded Tokio channel like you did before, then the code should work just like it did before.

1 Like

You're absolutely right! I'm not sure why.

It is because the receiver implements a future but the sender doesn't? I don't need to await the sender

It's because awaiting the send call when you put it in a closure like that has some challenges lifetime-wise.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.