So if I change it to this:
impl FilterBlock {
pub fn run(self, receiver: Receiver<Value>) -> Receiver<Value> {
let (filter_sender, output_receiver) = channel(1_024);
let filter_stream = receiver.for_each(move |message| {
debug!("FilterBlock received a message.");
let filter_sender = filter_sender.clone();
let stream = stream::iter_ok::<_, ()>(self.0.to_owned())
.fold(message, |acc, curr| {
lazy(|| {
match curr {
Filter::Json(p) => p.process(acc),
Filter::Mutate(p) => p.process(acc)
}
})
}).and_then(move |message| {
debug!("FilterBlock preparing to send a message.");
filter_sender.send(message).map_err(|_| ()).poll();
// debug!("FilterBlock sent a message.");
Ok(())
});
stream
});
tokio::spawn(filter_stream);
output_receiver
}
}
it still has the same bug.
Running with TRACE
doesn't seem to give me anything I can see that is relevant. There is still just the fist log output (from my own debug!
log that HttpPoller
was polled, but that's where it stops. It never shows that again.
Finished dev [unoptimized + debuginfo] target(s) in 0.60s
Running `target/debug/emmett`
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE tokio_threadpool::builder] build; num-workers=12
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=1
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] shutdown; state=pool::State { lifecycle: Running, num_futures: 1 }
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> transitioned to shutdown
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(11)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=11
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE mio::sys::unix::kqueue] registering; token=Token(18446744073709551615); interests=Readable
[2019-06-03T22:37:38Z TRACE mio::poll] registering with poller
[2019-06-03T22:37:38Z DEBUG emmett::pipeline] Running InputBlock
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Spawned a new Input plugin.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=2
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=10
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Spawned a new Input plugin.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=3
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=9
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=9
[2019-06-03T22:37:38Z DEBUG emmett::pipeline] Running FilterBlock
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=8
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=8
[2019-06-03T22:37:38Z DEBUG emmett::pipeline] Running OutputBlock
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> submit external
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(10)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=10
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::sender] execute; count=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] -> submit external
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs::http_poller] Polled HttpPoller input.
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:38Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=6
[2019-06-03T22:37:38Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=9; from=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] -> task complete
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] task complete; state=pool::State { lifecycle: ShutdownOnIdle, num_futures: 5 }
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=4
[2019-06-03T22:37:38Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=3
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=8; from=10
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=3
[2019-06-03T22:37:38Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=2
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::pool] signal_work -- spawn; idx=2
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:38Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(9)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(7)
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=9
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(11)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=7
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=9
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=11
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(8)
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=11
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=8
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=8
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(6)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=6
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(4)
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(3)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(5)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=4
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=3
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=5
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=3
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=5
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(2)
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=2
[2019-06-03T22:37:38Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=2
[2019-06-03T22:37:38Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=9
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::notifier] Notifier::notify; id=0x7f99e3d16280
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] -> submit internal; idx=9
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=9
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs::generator] Generator input timer is ready.
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs] Received message from Input plugin.
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::notifier] Notifier::notify; id=0x7f99e3e00f40
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] -> submit internal; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=9
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=5
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs] Polling Input plugins.
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=9
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::inputs::generator] Polled Generator input plugin.
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=2; from=9
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=5
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::filters] FilterBlock received a message.
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(2)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(3)
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=3
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::filters] FilterBlock preparing to send a message.
[2019-06-03T22:37:40Z TRACE tokio_threadpool::notifier] Notifier::notify; id=0x7f99e3f009f0
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] -> submit internal; idx=5
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=3
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] try_steal_task -- signal_work; self=5; from=2
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::pool] signal_work -- notify; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=3
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_reactor] event Readable Token(4194303)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(5)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] stole task from another worker
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 1 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] sleeping -- push to stack; idx=5
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] Task::run; state=Running
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> wakeup; idx=2
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] -> starting to sleep; idx=5
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::outputs] OutputBlock received a message.
[2019-06-03T22:37:40Z TRACE tokio_reactor] loop process - 0 events, 0.000s
[2019-06-03T22:37:40Z TRACE tokio_threadpool::worker] Worker::sleep; worker=WorkerId(2)
[2019-06-03T22:37:40Z TRACE tokio_threadpool::task] -> not ready
[2019-06-03T22:37:40Z DEBUG emmett::pipeline::outputs::stdout] Stdout output plugin received a message.