Rumqttd broker.start() with tokio::spawn?

I'd like to add an mqtt broker to my project, and started with a small change to their sample:

use rumqttd::{Broker, Config};
use tokio::task::JoinHandle;

pub fn setup() -> JoinHandle<()> {
    // see docs of config crate to know more
    let config = config::Config::builder()
        .add_source(config::File::with_name("rumqttd.toml"))
        .build()
        .unwrap();

    // this is where we deserialize it into Config
    let rumqttd_config: Config = config.try_deserialize().unwrap();

    let mut broker = Broker::new(rumqttd_config);

    tokio::spawn(move || {
        // you can use better error handling instead on unwrap(). :)
        broker.start()
    })
}

Which fails to compile with the eror message:

error[E0277]: `{closure@src/mqtt_relay.rs:16:18: 16:25}` is not a future
   --> src/mqtt_relay.rs:16:18
    |
16  |       tokio::spawn(move || {
    |  _____------------_^
    | |     |
    | |     required by a bound introduced by this call
17  | |         // you can use better error handling instead on unwrap(). :)
18  | |         broker.start()
19  | |     })
    | |_____^ `{closure@src/mqtt_relay.rs:16:18: 16:25}` is not a future
    |
    = help: the trait `futures::Future` is not implemented for closure `{closure@src/mqtt_relay.rs:16:18: 16:25}`
note: required by a bound in `tokio::spawn`
   --> /home/user/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:12
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |            ^^^^^^ required by this bound in `spawn`

How can I make my closure implement the Future trait?

You put the async keyword in front of the closure block as in the example code for the spawn function. (No closure is used here as @paramagnetic said.)

1 Like

thank you for your reply! I've tried that, the error barley changes:

error[E0277]: `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
   --> src/mqtt_relay.rs:16:18
    |
16  |       tokio::spawn(async move || {
    |  _____------------_^
    | |     |
    | |     required by a bound introduced by this call
17  | |         // you can use better error handling instead on unwrap(). :)
18  | |         broker.start()
19  | |     })
    | |_____^ `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
    |
    = help: the trait `futures::Future` is not implemented for `{async closure@src/mqtt_relay.rs:16:18: 16:31}`
note: required by a bound in `tokio::spawn`
   --> /home/user/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:12
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |            ^^^^^^ required by this bound in `spawn`

For more information about this error, try `rustc --explain E0277`.

I now thought maybe I need to wrap it in an async function, but that also didn't help:

use rumqttd::{Broker, Config};
use tokio::task::JoinHandle;

pub fn setup() -> JoinHandle<()> {
    // see docs of config crate to know more
    let config = config::Config::builder()
        .add_source(config::File::with_name("rumqttd.toml"))
        .build()
        .unwrap();

    // this is where we deserialize it into Config
    let rumqttd_config: Config = config.try_deserialize().unwrap();

    let mut broker = Broker::new(rumqttd_config);

    tokio::spawn(async move || {
        start_broker(broker).await
    })
}

async fn start_broker(mut broker: Broker) -> anyhow::Result<()> {
    broker.start()?;
    Ok(())
}

error:

error[E0277]: `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
   --> src/mqtt_relay.rs:16:18
    |
16  |       tokio::spawn(async move || {
    |  _____------------_^
    | |     |
    | |     required by a bound introduced by this call
17  | |         // you can use better error handling instead on unwrap(). :)
18  | |         start_broker(broker).await
19  | |     })
    | |_____^ `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
    |
    = help: the trait `futures::Future` is not implemented for `{async closure@src/mqtt_relay.rs:16:18: 16:31}`
note: required by a bound in `tokio::spawn`
   --> /home/user/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:12
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |            ^^^^^^ required by this bound in `spawn`

ahh, now I got it:

use rumqttd::{Broker, Config};
use tokio::task::JoinHandle;

pub fn setup() -> JoinHandle<()> {
    // see docs of config crate to know more
    let config = config::Config::builder()
        .add_source(config::File::with_name("rumqttd.toml"))
        .build()
        .unwrap();

    // this is where we deserialize it into Config
    let rumqttd_config: Config = config.try_deserialize().unwrap();

    let mut broker = Broker::new(rumqttd_config);

    tokio::spawn(start_broker(broker))
}

async fn start_broker(mut broker: Broker) {
    broker.start().ok();
}

You don't need a closure. spawn spawns a future directly. Drop the ||.

async functions aren't called like regular functions. When you call an async function, it doesn't actually execute its body, it just returns a future that represents the computation and will be polled repeatedly, to execute the steps of the computation gradually.

Therefore, it would be useless for spawn() to take an actual function that returns a future; the only thing it could do is call it to obtain the future. Just passing in the future directly is therefore enough – it will be evaluated lazily anyway.

3 Likes

To illustrate what @paramagnetic means:

tokio::spawn(async move { broker.start().await })
2 Likes

thank you for the clarification! sorry for the late reaction, I've now used it to replace my named async fn with an async move closure, see: mqtt_relay: replace start_broker async wrapper fn by async closure + log errors if it fails (8d0f510e) · Commits · thomas351 / rust-influxdb-udp-logger · GitLab

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.