There is no reactor running, must be called from the context of a Tokio 1.x runtime

I really don't know what went wrong :pensive:

main.rs

mod conf;

use std::path::Path;
use std::sync::Arc;
use notify::{Event, Watcher, Error, EventKind, RecommendedWatcher, RecursiveMode};
use notify::event::ModifyKind;
use tokio::{runtime,sync::mpsc::{channel, Receiver}};
use std::sync::RwLock;
use std::time::Duration;
use crate::conf::Conf;

fn main() -> Result<(), anyhow::Error> {
    let hd = dirs::home_dir().unwrap();
    let hd_str = hd.to_str().unwrap();
    let conf_path = format!("{}/.test/conf.toml", hd_str);

    let (num_tokio_worker_threads, max_tokio_blocking_threads) = (num_cpus::get(), 512); // 512 is tokio's current default
    println!("{}", num_tokio_worker_threads);
    let rt = runtime::Builder::new_multi_thread()
        .enable_all()
        .thread_stack_size(8 * 1024 * 1024)
        .worker_threads(num_tokio_worker_threads)
        .max_blocking_threads(max_tokio_blocking_threads)
        .build()?;

    let config = Arc::new(RwLock::new(conf::Conf::load_conf(conf_path.as_ref()).unwrap()));
    let config_cloned = Arc::clone(&config);

    let cp_cloned = conf_path.clone();

    tokio::task::spawn(async move {
        async_watch(&cp_cloned, config_cloned);
    });


    rt.block_on(async move {
        tokio::time::sleep(Duration::from_secs(60)).await;
    });
    println!("{:?}", config.read().unwrap().listen_port);
    Ok(())
}


fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
    let (mut tx, rx) = channel(1);

    // Automatically select the best implementation for your platform.
    // You can also access each implementation directly e.g. INotifyWatcher.
    let watcher = RecommendedWatcher::new(move |res| {
        tx.blocking_send(res).unwrap()
    })?;

    Ok((watcher, rx))
}

async fn async_watch<P: AsRef<Path>>(path: P, config_cloned: Arc<RwLock<Conf>>) {
    let (mut watcher, mut rx) = async_watcher().unwrap();

    // Add a path to be watched. All files and directories at that path and
    // below will be monitored for changes.
    watcher.watch(Path::new(path.as_ref().clone().to_str().unwrap()), RecursiveMode::Recursive).unwrap();

    while let Some(res) = rx.recv().await {
        match res {
            Ok(event) => {
                if event.kind == EventKind::Modify(ModifyKind::Any) {
                    match conf::Conf::load_conf(path.as_ref().clone()) {
                        Ok(new_config) => *config_cloned.write().unwrap() = new_config,
                        Err(e) => {
                            println!("{}", e)
                        }
                    }
                }
            }
            Err(e) => println!("watch error: {:?}", e),
        }
    }
}

conf.rs

use std::sync::Arc;
use std::path::{Path, PathBuf};
use std::fs::File;
use std::io::Read;
use tokio::sync::RwLock;
use serde::{Deserialize};

#[derive(Debug, Deserialize)]
pub struct Conf {
    pub listen_port: u16,
}

impl Conf {
    pub fn load_conf(conf_path: &Path) -> Result<Conf, anyhow::Error> {
        match File::open(conf_path) {
            Ok(mut file) => {
                let mut str_val = String::new();
                match file.read_to_string(&mut str_val) {
                    Ok(_) => {
                        match toml::from_str::<Conf>(&str_val) {
                            Ok(c) => {
                                Ok(c)
                            }
                            Err(e) => {
                                Err(anyhow::Error::msg(format!("load conf failed with error: {}", e)))
                            }
                        }
                    }
                    Err(e) => {
                        Err(anyhow::Error::msg(format!("load conf failed with error: {}", e)))
                    }
                }
            }
            Err(e) => {
                Err(anyhow::Error::from(e))
            }
        }
    }
}

[package]
name = "explorer"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix = "0.10"
actix-web = "4"
actix-web-actors = "4.1.0"
anyhow = "1.0.23"
dirs = "2.0.2"
toml = "^0.5"
crossbeam-channel = "0.4.0"

[dependencies.num_cpus]
version = "1"

[dependencies.serde]
version = "1"
features = ["derive"]

[dependencies.tokio]
version = "1.18.1"
features = ["full"]

[dependencies.notify]
notify = "5.0.0-pre.15"
features = ["default"]

Your call to tokio::task::spawn happened outside of a Tokio runtime. Try using rt.spawn instead.

Anyway, there seems to be some issues related to blocking in your code. Please read this.