Tokio Futures / Timer bug

I’m working on a Logstash competitor and having an issue with tokio::timer::Interval. I currently have it set up to run just two Input plugins at once, both on intervals: one just generates a test message input and the other, HttpPoller polls a URI.

The problem is that sometimes when I run the program, only HttpPoller message come through. Other times is just the Generator messages. Other times it’s both. Other times none come through. I just restart the program and it’s different every time.

It sounds like there is something wrong with the Futures / Streams I am using for this, but I’m not sure what. I tried making changes suggested by others, but they didn’t resolve the issue, so I’m not sure if I’m just misunderstanding something or if it’s a different problem.

Any help would be greatly appreciated. I’ve been trying to figure this issue out for a while now but not getting anywhere.

Points of interest:

Thank you

Calling poll here seems suspect. For each message from the passed receiver you’re creating a Stream, polling it once, then discarding it even if it hasn’t completed. You likely want to be using another for_each here and returning that from the outer for_each so that it will drive that inner stream to completion.

1 Like

What should I use for_each on instead? AndThen is just a Future, so I can’t call for_each on it.

I tried this thinking that spawning it instead of polling once would fix it, but the bug is still there.

I am wondering if it isn’t actually the FilterBlock that is causing this, but something with the Inputs. If I add a debug log message to both Generator and HttpPoller input plugins and run the program, if the HttpPoller is the one that works, I get this:

     Running `target/debug/emmett`
[2019-06-03T11:24:46Z DEBUG emmett::pipeline] Running InputBlock
[2019-06-03T11:24:46Z DEBUG emmett::pipeline::inputs] Spawned a new Input plugin.
[2019-06-03T11:24:46Z DEBUG emmett::pipeline::inputs] Spawned a new Input plugin.
[2019-06-03T11:24:46Z DEBUG emmett::pipeline] Running FilterBlock
[2019-06-03T11:24:46Z DEBUG emmett::pipeline] Running OutputBlock
[2019-06-03T11:24:46Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T11:24:46Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T11:24:46Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T11:24:46Z DEBUG emmett::pipeline::inputs::http_poller] Polled HttpPoller input.
[2019-06-03T11:24:47Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T11:24:47Z DEBUG emmett::pipeline::inputs::http_poller] Polled HttpPoller input.
[2019-06-03T11:24:47Z DEBUG emmett::pipeline::inputs::http_poller] HttpPoller timer is ready.
[2019-06-03T11:24:47Z DEBUG emmett::pipeline::inputs::http_poller] Sending http request.
[2019-06-03T11:24:47Z DEBUG hyper::client::connect::dns] resolving host="jsonplaceholder.typicode.com"

That shows the Generator plugin is being polled here: https://gitlab.com/andrewbanchich/emmett/blob/dev/src/pipeline/inputs.rs#L16

But then it just stops and I never see the polling happen again.

If it were an issue with the FilterBlock, shouldn’t the Generator Input Stream still be getting polled regularly?

Oh, I didn’t notice the fold is already turning your Stream back into a Future, in that case you should be able to just return the result of the and_then as the closure result so that the ForEach will run it to completion.

I can’t see anything obviously wrong with how the input streams are running, so not sure why they would stop. You could try putting your log-level up to TRACE and you should get quite a bit of output from Tokio that might help diagnose it.

So if I change it to this:

impl FilterBlock {
    pub fn run(self, receiver: Receiver<Value>) -> Receiver<Value> {

        let (filter_sender, output_receiver) = channel(1_024);

        let filter_stream = receiver.for_each(move |message| {

            debug!("FilterBlock received a message.");

            let filter_sender = filter_sender.clone();

            let stream = stream::iter_ok::<_, ()>(self.0.to_owned())
                .fold(message, |acc, curr| {
                    lazy(|| {
                        match curr {
                            Filter::Json(p) => p.process(acc),
                            Filter::Mutate(p) => p.process(acc)
                        }
                    })
                }).and_then(move |message| {
                    debug!("FilterBlock preparing to send a message.");
                    filter_sender.send(message).map_err(|_| ()).poll();
                    // debug!("FilterBlock sent a message.");
                    Ok(())
                });

            stream
                
        });

        tokio::spawn(filter_stream);
        
        output_receiver
            
    }
}

it still has the same bug.

Running with TRACE doesn’t seem to give me anything I can see that is relevant. There is still just the fist log output (from my own debug! log that HttpPoller was polled, but that’s where it stops. It never shows that again.

    Finished dev [unoptimized + debuginfo] target(s) in 0.60s
     Running `target/debug/emmett`
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE tokio_threadpool::builder] build; num-workers=12
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=1
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]     -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] shutdown; state=pool::State { lifecycle: Running, num_futures: 1 }
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]   -> transitioned to shutdown
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(11)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=11
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z DEBUG emmett::pipeline] Running InputBlock
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Spawned a new Input plugin.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=2
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]     -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=10
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Spawned a new Input plugin.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=3
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]     -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=9
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=9
[2019-06-03T22:37:38Z DEBUG emmett::pipeline] Running FilterBlock
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]     -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=8
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=8
[2019-06-03T22:37:38Z DEBUG emmett::pipeline] Running OutputBlock
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]     -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(10)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=10
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool]     -> submit external
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs::http_poller] Polled HttpPoller input.
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=6
[2019-06-03T22:37:38Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=9; from=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task]     -> task complete
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] task complete; state=pool::State { lifecycle: ShutdownOnIdle, num_futures: 5 }
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=4
[2019-06-03T22:37:38Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=3
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=8; from=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=3
[2019-06-03T22:37:38Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=2
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=2
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(9)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(7)
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=9
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(11)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=9
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(8)
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=11
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=8
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=8
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(6)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(4)
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(3)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(5)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=3
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=3
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=5
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(2)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=2
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=2
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=9
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::notifier] Notifier::notify; id=0x7f99e3d16280
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool]     -> submit internal; idx=9
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=9
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs::generator] Generator input timer is ready.
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs] Received message from Input plugin.
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::notifier] Notifier::notify; id=0x7f99e3e00f40
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool]     -> submit internal; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=9
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=5
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=9
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=2; from=9
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=5
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::filters] FilterBlock received a message.
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(2)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(3)
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=3
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::filters] FilterBlock preparing to send a message.
[2019-06-03T22:37:40Z TRACE tokio_threadpool::notifier] Notifier::notify; id=0x7f99e3f009f0
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool]     -> submit internal; idx=5
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=5; from=2
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=3
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(5)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]   sleeping -- push to stack; idx=5
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> wakeup; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker]     -> starting to sleep; idx=5
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::outputs] OutputBlock received a message.
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(2)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task]     -> not ready
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::outputs::stdout] Stdout output plugin received a message.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.