There is no reactor running, must be called from the context of a Tokio 1.x runtime

I really don't know what went wrong :pensive:

main.rs

mod conf;

use std::path::Path;
use std::sync::Arc;
use notify::{Event, Watcher, Error, EventKind, RecommendedWatcher, RecursiveMode};
use notify::event::ModifyKind;
use tokio::{runtime,sync::mpsc::{channel, Receiver}};
use std::sync::RwLock;
use std::time::Duration;
use crate::conf::Conf;

fn main() -> Result<(), anyhow::Error> {
    let hd = dirs::home_dir().unwrap();
    let hd_str = hd.to_str().unwrap();
    let conf_path = format!("{}/.test/conf.toml", hd_str);

    let (num_tokio_worker_threads, max_tokio_blocking_threads) = (num_cpus::get(), 512); // 512 is tokio's current default
    println!("{}", num_tokio_worker_threads);
    let rt = runtime::Builder::new_multi_thread()
        .enable_all()
        .thread_stack_size(8 * 1024 * 1024)
        .worker_threads(num_tokio_worker_threads)
        .max_blocking_threads(max_tokio_blocking_threads)
        .build()?;

    let config = Arc::new(RwLock::new(conf::Conf::load_conf(conf_path.as_ref()).unwrap()));
    let config_cloned = Arc::clone(&config);

    let cp_cloned = conf_path.clone();

    tokio::task::spawn(async move {
        async_watch(&cp_cloned, config_cloned);
    });


    rt.block_on(async move {
        tokio::time::sleep(Duration::from_secs(60)).await;
    });
    println!("{:?}", config.read().unwrap().listen_port);
    Ok(())
}


fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
    let (mut tx, rx) = channel(1);

    // Automatically select the best implementation for your platform.
    // You can also access each implementation directly e.g. INotifyWatcher.
    let watcher = RecommendedWatcher::new(move |res| {
        tx.blocking_send(res).unwrap()
    })?;

    Ok((watcher, rx))
}

async fn async_watch<P: AsRef<Path>>(path: P, config_cloned: Arc<RwLock<Conf>>) {
    let (mut watcher, mut rx) = async_watcher().unwrap();

    // Add a path to be watched. All files and directories at that path and
    // below will be monitored for changes.
    watcher.watch(Path::new(path.as_ref().clone().to_str().unwrap()), RecursiveMode::Recursive).unwrap();

    while let Some(res) = rx.recv().await {
        match res {
            Ok(event) => {
                if event.kind == EventKind::Modify(ModifyKind::Any) {
                    match conf::Conf::load_conf(path.as_ref().clone()) {
                        Ok(new_config) => *config_cloned.write().unwrap() = new_config,
                        Err(e) => {
                            println!("{}", e)
                        }
                    }
                }
            }
            Err(e) => println!("watch error: {:?}", e),
        }
    }
}

conf.rs

use std::sync::Arc;
use std::path::{Path, PathBuf};
use std::fs::File;
use std::io::Read;
use tokio::sync::RwLock;
use serde::{Deserialize};

#[derive(Debug, Deserialize)]
pub struct Conf {
    pub listen_port: u16,
}

impl Conf {
    pub fn load_conf(conf_path: &Path) -> Result<Conf, anyhow::Error> {
        match File::open(conf_path) {
            Ok(mut file) => {
                let mut str_val = String::new();
                match file.read_to_string(&mut str_val) {
                    Ok(_) => {
                        match toml::from_str::<Conf>(&str_val) {
                            Ok(c) => {
                                Ok(c)
                            }
                            Err(e) => {
                                Err(anyhow::Error::msg(format!("load conf failed with error: {}", e)))
                            }
                        }
                    }
                    Err(e) => {
                        Err(anyhow::Error::msg(format!("load conf failed with error: {}", e)))
                    }
                }
            }
            Err(e) => {
                Err(anyhow::Error::from(e))
            }
        }
    }
}

[package]
name = "explorer"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix = "0.10"
actix-web = "4"
actix-web-actors = "4.1.0"
anyhow = "1.0.23"
dirs = "2.0.2"
toml = "^0.5"
crossbeam-channel = "0.4.0"

[dependencies.num_cpus]
version = "1"

[dependencies.serde]
version = "1"
features = ["derive"]

[dependencies.tokio]
version = "1.18.1"
features = ["full"]

[dependencies.notify]
notify = "5.0.0-pre.15"
features = ["default"]

Your call to tokio::task::spawn happened outside of a Tokio runtime. Try using rt.spawn instead.

Anyway, there seems to be some issues related to blocking in your code. Please read this.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.