I'd like to add an mqtt broker to my project, and started with a small change to their sample:
use rumqttd::{Broker, Config};
use tokio::task::JoinHandle;
pub fn setup() -> JoinHandle<()> {
// see docs of config crate to know more
let config = config::Config::builder()
.add_source(config::File::with_name("rumqttd.toml"))
.build()
.unwrap();
// this is where we deserialize it into Config
let rumqttd_config: Config = config.try_deserialize().unwrap();
let mut broker = Broker::new(rumqttd_config);
tokio::spawn(move || {
// you can use better error handling instead on unwrap(). :)
broker.start()
})
}
Which fails to compile with the eror message:
error[E0277]: `{closure@src/mqtt_relay.rs:16:18: 16:25}` is not a future
--> src/mqtt_relay.rs:16:18
|
16 | tokio::spawn(move || {
| _____------------_^
| | |
| | required by a bound introduced by this call
17 | | // you can use better error handling instead on unwrap(). :)
18 | | broker.start()
19 | | })
| |_____^ `{closure@src/mqtt_relay.rs:16:18: 16:25}` is not a future
|
= help: the trait `futures::Future` is not implemented for closure `{closure@src/mqtt_relay.rs:16:18: 16:25}`
note: required by a bound in `tokio::spawn`
--> /home/user/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:12
|
164 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
165 | where
166 | F: Future + Send + 'static,
| ^^^^^^ required by this bound in `spawn`
How can I make my closure implement the Future trait?
You put the async keyword in front of the closure block as in the example code for the spawn function. (No closure is used here as @paramagnetic said.)
thank you for your reply! I've tried that, the error barley changes:
error[E0277]: `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
--> src/mqtt_relay.rs:16:18
|
16 | tokio::spawn(async move || {
| _____------------_^
| | |
| | required by a bound introduced by this call
17 | | // you can use better error handling instead on unwrap(). :)
18 | | broker.start()
19 | | })
| |_____^ `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
|
= help: the trait `futures::Future` is not implemented for `{async closure@src/mqtt_relay.rs:16:18: 16:31}`
note: required by a bound in `tokio::spawn`
--> /home/user/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:12
|
164 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
165 | where
166 | F: Future + Send + 'static,
| ^^^^^^ required by this bound in `spawn`
For more information about this error, try `rustc --explain E0277`.
I now thought maybe I need to wrap it in an async function, but that also didn't help:
use rumqttd::{Broker, Config};
use tokio::task::JoinHandle;
pub fn setup() -> JoinHandle<()> {
// see docs of config crate to know more
let config = config::Config::builder()
.add_source(config::File::with_name("rumqttd.toml"))
.build()
.unwrap();
// this is where we deserialize it into Config
let rumqttd_config: Config = config.try_deserialize().unwrap();
let mut broker = Broker::new(rumqttd_config);
tokio::spawn(async move || {
start_broker(broker).await
})
}
async fn start_broker(mut broker: Broker) -> anyhow::Result<()> {
broker.start()?;
Ok(())
}
error:
error[E0277]: `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
--> src/mqtt_relay.rs:16:18
|
16 | tokio::spawn(async move || {
| _____------------_^
| | |
| | required by a bound introduced by this call
17 | | // you can use better error handling instead on unwrap(). :)
18 | | start_broker(broker).await
19 | | })
| |_____^ `{async closure@src/mqtt_relay.rs:16:18: 16:31}` is not a future
|
= help: the trait `futures::Future` is not implemented for `{async closure@src/mqtt_relay.rs:16:18: 16:31}`
note: required by a bound in `tokio::spawn`
--> /home/user/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:12
|
164 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
165 | where
166 | F: Future + Send + 'static,
| ^^^^^^ required by this bound in `spawn`
use rumqttd::{Broker, Config};
use tokio::task::JoinHandle;
pub fn setup() -> JoinHandle<()> {
// see docs of config crate to know more
let config = config::Config::builder()
.add_source(config::File::with_name("rumqttd.toml"))
.build()
.unwrap();
// this is where we deserialize it into Config
let rumqttd_config: Config = config.try_deserialize().unwrap();
let mut broker = Broker::new(rumqttd_config);
tokio::spawn(start_broker(broker))
}
async fn start_broker(mut broker: Broker) {
broker.start().ok();
}
You don't need a closure. spawn spawns a future directly. Drop the ||.
async functions aren't called like regular functions. When you call an async function, it doesn't actually execute its body, it just returns a future that represents the computation and will be polled repeatedly, to execute the steps of the computation gradually.
Therefore, it would be useless for spawn() to take an actual function that returns a future; the only thing it could do is call it to obtain the future. Just passing in the future directly is therefore enough – it will be evaluated lazily anyway.