"attempted to run an executor while another executor is already running" using Interval

Hi, I am getting the following panic:

/Users/froyer/.cargo/bin/cargo test --color=always --all -- --nocapture
    Finished dev [unoptimized + debuginfo] target(s) in 0.17s
     Running target/debug/deps/tmp_rust-5d05d41248b4738c

running 1 test
thread 'tokio-runtime-worker-0' panicked at 'Deserialize(Error(Json(Error("attempted to run an executor while another executor is already running", line: 0, column: 0))))', src/lib.rs:70:26
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
             at src/libstd/sys/unix/backtrace/tracing/gcc_s.rs:39
   1: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:71
   2: std::panicking::default_hook::{{closure}}
             at src/libstd/sys_common/backtrace.rs:59
             at src/libstd/panicking.rs:197
   3: std::panicking::default_hook
             at src/libstd/panicking.rs:211
   4: <std::panicking::begin_panic::PanicPayload<A> as core::panic::BoxMeUp>::get
             at src/libstd/panicking.rs:474
   5: std::panicking::continue_panic_fmt
             at src/libstd/panicking.rs:381
   6: std::panicking::try::do_call
             at src/libstd/panicking.rs:336
   7: tmp_rust::tests::test_schedule::{{closure}}
             at src/lib.rs:70
   8: core::result::Result<T,E>::map_err
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libcore/result.rs:522
   9: <futures::future::map_err::MapErr<A,F> as futures::future::Future>::poll
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/future/map_err.rs:34
  10: hashbrown::raw::bitmask::BitMask::invert
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/future/mod.rs:113
  11: futures::task_impl::NotifyHandle::new
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/task_impl/mod.rs:329
  12: futures::task_impl::NotifyHandle::new
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/task_impl/mod.rs:399
  13: futures::task_impl::std::CURRENT_THREAD_NOTIFY::__init
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/task_impl/std/mod.rs:83
  14: futures::task_impl::NotifyHandle::new
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/task_impl/mod.rs:399
  15: futures::task_impl::NotifyHandle::new
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/task_impl/mod.rs:291
  16: futures::task_impl::NotifyHandle::new
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.28/src/task_impl/mod.rs:329
  17: tokio_threadpool::task::Task::run::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/task/mod.rs:145
  18: core::ops::function::FnOnce::call_once
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libcore/ops/function.rs:231
  19: <tokio_threadpool::pool::state::Lifecycle as core::cmp::PartialEq>::eq
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panic.rs:315
  20: <tokio_threadpool::config::Config as core::clone::Clone>::clone
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panicking.rs:293
  21: panic_unwind::dwarf::eh::read_encoded_pointer
             at src/libpanic_unwind/lib.rs:85
  22: <tokio_threadpool::config::Config as core::clone::Clone>::clone
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panicking.rs:272
  23: <alloc::collections::CollectionAllocErr as core::convert::From<core::alloc::AllocErr>>::from
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panic.rs:394
  24: tokio_threadpool::task::Task::run
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/task/mod.rs:130
  25: tokio_threadpool::worker::Worker::run_task2
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:567
  26: tokio_threadpool::worker::Worker::run_task
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:459
  27: tokio_threadpool::worker::Worker::try_run_owned_task
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:390
  28: tokio_threadpool::worker::Worker::try_run_task
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:297
  29: tokio_threadpool::worker::Worker::with_current::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:241
  30: tokio::runtime::threadpool::builder::Builder::build::{{closure}}::{{closure}}::{{closure}}::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.22/src/runtime/threadpool/builder.rs:390
  31: <&usize as core::ops::bit::BitAnd<usize>>::bitand
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.11/src/timer/handle.rs:101
  32: tokio_timer::timer::handle::CURRENT_TIMER::__init
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:299
  33: tokio_timer::timer::handle::CURRENT_TIMER::__init
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:245
  34: <&usize as core::ops::bit::BitAnd<usize>>::bitand
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.11/src/timer/handle.rs:84
  35: tokio::runtime::threadpool::builder::Builder::build::{{closure}}::{{closure}}::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.22/src/runtime/threadpool/builder.rs:382
  36: tokio_timer::ms
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.11/src/clock/clock.rs:137
  37: tokio_timer::timer::handle::CURRENT_TIMER::__init
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:299
  38: tokio_timer::timer::handle::CURRENT_TIMER::__init
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:245
  39: tokio_timer::ms
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.11/src/clock/clock.rs:117
  40: tokio::runtime::threadpool::builder::Builder::build::{{closure}}::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.22/src/runtime/threadpool/builder.rs:381
  41: core::alloc::Layout::repeat
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.9/src/lib.rs:237
  42: tokio_timer::timer::handle::CURRENT_TIMER::__init
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:299
  43: tokio_timer::timer::handle::CURRENT_TIMER::__init
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:245
  44: core::alloc::Layout::repeat
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.9/src/lib.rs:217
  45: tokio::runtime::threadpool::builder::Builder::build::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.22/src/runtime/threadpool/builder.rs:380
  46: tokio_threadpool::callback::Callback::call
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/callback.rs:22
  47: tokio_threadpool::worker::Worker::do_run::{{closure}}::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:127
  48: tokio_threadpool::pool::Pool::rand_usize::RNG::__getit
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.8/src/global.rs:209
  49: std::sys::unix::mutex::Mutex::destroy
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:299
  50: std::sys::unix::mutex::Mutex::destroy
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:245
  51: tokio_threadpool::pool::Pool::rand_usize::RNG::__getit
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.8/src/global.rs:178
  52: tokio_threadpool::worker::Worker::do_run::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:125
  53: std::sys::unix::mutex::Mutex::destroy
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:299
  54: std::sys::unix::mutex::Mutex::destroy
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/local.rs:245
  55: tokio_threadpool::worker::Worker::do_run
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/worker/mod.rs:116
  56: tokio_threadpool::pool::Pool::spawn_thread::{{closure}}
             at /Users/froyer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.15/src/pool/mod.rs:344
  57: crossbeam_epoch::collector::LocalHandle::is_pinned
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/sys_common/backtrace.rs:136
  58: std::sync::once::Once::is_completed
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/mod.rs:470
  59: <tokio_threadpool::pool::state::Lifecycle as core::cmp::PartialEq>::eq
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panic.rs:315
  60: <tokio_threadpool::config::Config as core::clone::Clone>::clone
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panicking.rs:293
  61: panic_unwind::dwarf::eh::read_encoded_pointer
             at src/libpanic_unwind/lib.rs:85
  62: <tokio_threadpool::config::Config as core::clone::Clone>::clone
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panicking.rs:272
  63: <alloc::collections::CollectionAllocErr as core::convert::From<core::alloc::AllocErr>>::from
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/panic.rs:394
  64: std::sync::once::Once::is_completed
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libstd/thread/mod.rs:469
  65: core::ops::function::FnOnce::call_once{{vtable.shim}}
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/libcore/ops/function.rs:231
  66: <alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/liballoc/boxed.rs:704
  67: std::sys::unix::thread::Thread::new::thread_start
             at /rustc/a53f9df32fbb0b5f4382caaad8f1a46f36ea887c/src/liballoc/boxed.rs:704
             at src/libstd/sys_common/thread.rs:13
             at src/libstd/sys/unix/thread.rs:79
  68: _pthread_body
  69: _pthread_start
test tests::test_schedule ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

   Doc-tests tmp_rust

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

With this code:

#![warn(unused_extern_crates, missing_debug_implementations, rust_2018_idioms)]
#![deny(unsafe_code)]

use futures::stream::Stream;
use reqwest::Response;
use serde::de::DeserializeOwned;
use std::time::{Duration, Instant};
use tokio::timer::Interval;

pub trait Fetch {
    fn fetch(&self) -> Result<Response, reqwest::Error>;
}

#[derive(Debug)]
pub enum Error {
    Deserialize(reqwest::Error),
    Network(reqwest::Error),
    Tokio(tokio::timer::Error),
}

pub fn schedule<'de, F: Fetch, R>(
    fetch: F,
    fetch_interval: Duration,
) -> impl Stream<Item = R, Error = Error>
where
    R: DeserializeOwned,
{
    Interval::new(Instant::now(), fetch_interval)
        .map_err(Error::Tokio)
        .and_then(move |_| fetch.fetch().map_err(Error::Network))
        .and_then(|mut response| response.json::<R>().map_err(Error::Deserialize))
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::stream::Stream;
    use reqwest::Response;
    use serde::Deserialize;
    use futures::Future;

    #[derive(Debug, Deserialize, PartialEq)]
    struct HelloWorld {
        hello: String,
    }

    struct Fetcher {}

    impl Fetch for Fetcher {
        fn fetch(&self) -> Result<Response, reqwest::Error> {
            Ok(Response::from(http::Response::new(r#"{"hello": "world"}"#)))
        }
    }

    #[test]
    fn test_schedule() {
        let fetch = Fetcher {};

        let scheduler = schedule(fetch, Duration::from_millis(100))
            .take(2)
            .for_each(move |response: HelloWorld| {
                assert_eq!(
                    response,
                    HelloWorld {
                        hello: "world".into()
                    }
                );
                Ok(())
            })
            .map_err(|e| panic!("{:?}", e));

        tokio::runtime::run(scheduler);
    }

}

Could you please help me find out:

  1. How can avoid the panic? What am I doing wrong?
  2. How can make the test fail when it panics?

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.