Moving a Sender into a Fn Closure

As stated in the question here, "Closing multiple hyper servers", I'm trying to shutdown a hyper server. However, I'm not able to move a tx (of a oneshot channel) into the closure, since the closure must implement the Fn trait, and not the FnOnce. I've also tried to wrap the sender into an Arc<Mutex> and cloning it inside the closure, but failed also with that.

Has anybody an idea on how to achieve that?

use hyper::rt::{self, Future};
use hyper::service::service_fn_ok;
use hyper::{Body, Request, Response, Server};

fn main() {
    let addr = ([127, 0, 0, 1], 3000).into();
    let (tx, rx) = futures::sync::oneshot::channel::<()>();
    let server = Server::bind(&addr).serve(|| {
        tx.send(()).unwrap();
        service_fn_ok(move |_: Request<Body>| {
            Response::new(Body::from("Hello World!"))
        })
    });
    let graceful = server
        .with_graceful_shutdown(rx)
        .map_err(|err| eprintln!("server error: {}", err));

    println!("Listening on http://{}", addr);
    rt::run(graceful);
}

You need to put the sender in an option and take it out when sending the message.

Hmm.. And then wrap it into an Arc and clone it?
It then gives me the following error:

  • Not wrapped, without cloning:
expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
  • Not wrapped, with cloning:
no method named `clone` found for type `std::option::Option<tokio_sync::oneshot::Sender<()>>` in the current scope`
  • Wrapped, with and without cloning:
Cannot move out of dereference of `std::sync::MutexGuard<'_, std::option::Option<tokio_sync::oneshot::Sender<()>>>`

You need an owned version. To get the owned version of Sender stored within option, use the take function. This works well with oneshot, because you pull it out of an option once and then you send through it once, and thereafter drop them.

There is no need for cloning to get the owned version of Sender

https://doc.rust-lang.org/std/option/enum.Option.html#method.take

1 Like

You will probably need an Mutex<Option<Sender>> and use something like

if let Some(tx) = tx_mutex.lock().unwrap().take() {
    tx.send(()).unwrap();
}

Here we use take which replaces the option with None and gives you the previous value in the option. Note that this only gives you the sender once, as sending something to an oneshot channel consumes it.

You will probably also need to put move on the serve closure to ensure the mutex is moved into the closure.

It does not seem like you need an Arc as you do not actually need to share the tx between more than one closure. You do need the mutex since as the closure is an Fn, it may be called concurrently from several threads.

2 Likes

Wow, thanks. You guys are fantastic! :tada::confetti_ball:

You should also consider if you want

tx.send(()).unwrap();

or

// ignore the error
let _ = tx.send(());

The send operation will fail exactly when the receiver has been dropped, so the question is whether you consider this an error?

2 Likes