I have some tokio code where it appears the receiver is getting dropped, and I don't quite understand what is going on. The (slightly sanitized) code looks like this:
pub fn process_points(
thread_pool: Arc<Mutex<Runtime>>,
recv: Receiver<Points>,
) -> impl Future<Item = (), Error = ()> {
let (key_send, key_recv) = channel(100);
thread_pool.lock().unwrap().spawn(read_keys(key_recv));
recv.map_err(|err| error!("Failed to receive point: {:?}", err))
.for_each(move |payload| {
let foo = process(payload);
let sender = key_send.clone();
thread_pool.lock().unwrap().spawn(
sender
.send(keys)
.map(|_| ())
.map_err(|err| error!("Failed to write to channel: {:?}", err)),
);
Ok(())
})
}
fn read_keys(recv: Receiver<HashSet<MinKey>>) -> impl Future<Item = (), Error = ()> {
recv.map_err(|err| error!("Failed to receive: {:?}", err))
.and_then(move |keys| {
do_stuff(&keys)
.map(move |resolved| (keys, resolved))
.map_err(|err| error!("Failed to do stuff: {:?}", err))
})
.and_then(move |(keys, resolved)| {
do_more_stuff(&keys, &resolved)
.map_err(|err| error!("Failed to do more stuff: {:?}", err))
})
.for_each(|_| Ok(()))
}
My logs are completely filled with 'Failed to write to channel: SendError(())'. I tried changing the error!() macros to be panic!() in case that was the issue, but the binary didn't crash / I didn't see any panics in my logs. There's clearly something I'm missing here.
I'm using tokio version 0.1.22 and cargo 1.40.0-nightly (3a9abe3f0 2019-10-15)
Well you never do anything with the key_send variable, so as it's not moved into the future returned by process_points, it's dropped at the end of process_points.
Where that sender variable comes from, I don't know. And I'd guess this was a compile error because of a variable reference to a variable that doesn't exist.
Note that you can use the executor method on Runtime to get a TaskExecutor in order to avoid that Arc<Mutex<Runtime>>. Note that a TaskExecutor can be cloned cheaply in order to share it many places.
After lots of trial and error, this seems to work as I want it:
recv.map_err(|err| error!("Failed to receive point: {:?}", err))
.map(move |keys| {
let resolved = do_stuff(&keys);
Future::join(future::ok(keys), resolved)
})
.map_err(|err| error!("Failed to do stuff: {:?}", err))
.map(move |res_fut| {
res_fut.map(move |(keys, resolved)| do_more_stuff(&keys, &resolved))
.map_err(|err| error!("Failed to do more stuff: {:?}", err))
})
.for_each(|_| Ok(()))
The nested maps are rather unpleasant, and I still don't understand why the and_then() version was dropping the receiver early. Any input into why this works and the other version doesn't would be very helpful for my learning.
It is likely that the first version encountered some error and exited. In the version you just posted using map, you never actually run the future you create inside the calls to map.
The first map that returns the joined future changes the type into a Stream<Output=Future> and the other map just takes the future and tells it to do some more stuff once it finished, but when you then reach the for_each, the future is stored in the variable you called _, and since you do nothing with it, it's just dropped. This means that no code after do_stuff actually does anything.
The reason this seems to work is probably that by not doing the additional work, you never hit the error that was the issue before.
Thanks. You are indeed correct. I thought that a failure was causing the issue initially, but I couldn't find any failures in the logs. It must have gotten lost in the noise. I ended up just biting the bullet and converting the code to async/await with nightly and writing code that doesn't halt after one error.