Tokio futures closure - what is it trying to move?


#1

I’m trying to make a stream of futures yielding results at timed intervals; think of it a little like frames in an animation. I’m generating a stream with content, and prepending a delay.and_then(…) to each. There’s a bunch of map_err slapped around to fit things together roughly, but it works fine until I try and do anything other than print the data.

extern crate futures;
extern crate rand;
extern crate tokio;
#[macro_use]
extern crate lazy_static;

use futures::future;
use futures::stream;
use rand::Rng;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::prelude::*;
use tokio::timer::Delay;

lazy_static! {
    static ref SOCK: UdpSocket =
        UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).expect("Can't bind address");
    static ref ADDR: SocketAddr = "192.168.21.199:9988".parse().unwrap();
}

fn main() {
    let mut buf = [0u8; 16];

    let s = stream::unfold(0, move |state| {
        for mut i in buf.iter_mut() {
            *i = i.wrapping_add(state);
        }
        let spc = rand::thread_rng().gen_range(100, 600);
        let del = Delay::new(Instant::now() + Duration::from_millis(spc));
        if state > 250 {
            return None;
        };
        let fut = future::ok((buf, state)); 
        Some(del.join(fut).map(|(_, b)| b).map_err(|_| ()))
    });

    let k = s.and_then(|b| SOCK.send_dgram(b, &ADDR).map_err(|_| ()))
        .for_each(|_| Ok(()));

    tokio::run(k);
}

But I get:

error[E0507]: cannot move out of borrowed content
  --> src/main.rs:40:28
   |
40 |     let k = s.and_then(|b| SOCK.send_dgram(b, &ADDR).map_err(|_| ()))
   |                            ^^^^ cannot move out of borrowed content

I need help understanding this error. I get the concept but am not seeing how it applies, and thus how to fix it:

  • what is it trying to move? The only thing it’s highlighting is the SOCK and I’m just trying to call a method on that.
  • where is it borrowed previously?

The lazy_static! is an attempt to get rid of previous lifetime issues around ‘doesn’t live long enough’ but didn’t help this one. Changing the s.and_then closure to use move |b| makes no difference. I’m pretty sure I only need one buffer and don’t need to copy/clone it, because unwrap says it will wait for one future to resolve before yielding the next.

More generally, other tips would be welcome:

  • is there a neater way to construct the stream of futures?
  • I wanted to try sending the stream to a sink on the udp socket but couldn’t figure out the framed stuff yet.
  • I eventually want to be able to hand the stream to something that will play it, but be able to cancel the stream based on other events hopefully just by dropping / replacing it with another.

#2

The move is happening in SOCK.send_dgram() because send_dgram() takes self as an argument. The borrow is because SOCK is not a value you own in main() but rather a borrow of the static.

You should not have a static UdpSocket - it should be something you create inside main. Every time you send_dgram() on it it gets moved. However, you get it back once the future representing the send_dgram operation resolves. That same future will give you the buffer back so you can reuse it.

Canceling a stream can be done a few ways. One is to select between the stream replay future (eg for_each()) and another future that is a receiver of a oneshot channel - sending a message to that channel will make that future complete and cancel the stream replay. You can also cancel it by returning an Error from something like for_each, which will terminate the stream with an error.


#3

The move is happening in SOCK.send_dgram() because send_dgram() takes self as an argument.

Right, of course. I think I did see at one point that it took an owned self, rather than a borrow. That does explain why if I use the ‘blocking’ std::net version of udp (where send() takes &self) it works fine. Of course one is not supposed to do that, even if in practice it will rarely if ever really block.

The borrow is because SOCK is not a value you own in main() but rather a borrow of the static.

Oh! This was the detail I was missing. Thank you.

You should not have a static UdpSocket - it should be something you create inside main.

Yeah, but that’s where I started, and I had to shift it out. Before they were static, I’d get a “might outlive” for the addr, and the following for the sock.

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/main.rs:38:28
   |
21 |     let sock = UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).expect("Can't bind address");
   |         ---- captured outer variable
...
38 |     let k = s.and_then(|b| sock.send_dgram(b, &ADDR).map_err(|_| ()))
   |                            ^^^^ cannot move out of captured outer variable in an `FnMut` closure

making the closure move also fixed addr without giving it a static lifetime, but not sock, so then I tried making them both static.

This is essentially the same error as before, so I don’t see what to do here. I don’t want to recreate the socket every time around the iter, but that’s the only place I can see to put it to avoid it being a captured variable. It makes no sense to pass it all the way along the futures chain. Clearly I need some kind of different construct.


#4

Maybe like this:

s.fold(socket, move |socket, b| {
    socket.send_dgram(b, &ADDR).map(move |(ret_socket, _)| ret_socket)
})

#5

I’d give @fudini’s suggestion a try. Also, SocketAddr is a Copy type so you should be able to freely (and cheaply) copy/move it around.


#6

Thanks. That looks like a way of passing it along the futures chain, but at least I can inject it at the stream consumer side rather than at the beginning. A very quick try popped up more mismatched types, probably some more map_err needed; I’ll look more carefully later.

Assuming it does work, there still seems like an issue here. It doesn’t seem like it should be so… hard? intricate? to use futures-based IO inside futures chains.


#7

Yeah, as noted, moving worked for the addr, but since I then tried static I tried it for both sock and addr to keep them together.


#8

Futures/tokio really bring out the “worst” in Rust learnability - you really have to know how Rust and these APIs work. In particular, you have to be mindful of which methods take self and you need to be careful/deliberate with how closures get desugared - what’s borrowed? What needs to be moved? Need to remember to clone outside of a closure and then move the clone into a closure. And so on.

Don’t get discouraged but do prepare yourself to be patient :slight_smile:.


#9

:slight_smile: That’s expected - and part of why I chose this as a place to start.


#10

Here’s a working example in case you’ve not gotten it to compile (or someone reading along is curious):

fn main() {
    let mut buf = [0u8; 16];

    let s = stream::unfold(0, move |state| {
        for mut i in buf.iter_mut() {
            *i = i.wrapping_add(state);
        }
        let spc = rand::thread_rng().gen_range(100, 600);
        let del = Delay::new(Instant::now() + Duration::from_millis(spc));
        if state > 250 {
            return None;
        };
        let fut = Ok((buf, state + 1));
        Some(del.join(fut).map(|(_, b)| b).map_err(|_| ()))
    });
    let addr = "192.168.21.199:9988".parse().unwrap();
    let k = s.fold(UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).unwrap(), move |sock, b| {
        sock.send_dgram(b, &addr).map(|(sock, _)| sock).map_err(|_| ())
    }).map(|_| ());

    tokio::run(k);
}

Note your snippet before didn’t increment state in the unfold combinator.


#11

Yep. That last map to () took me a while, but that’s what I came up with too. I’ve learned several useful things, including how to read the rather long compiler type mismatch errors in the midst of futures chains. Thanks @vitalyd and @fudini!

I still have the sock at the top rather than declared inline; as well as being what I had already, it feels more obvious that the sock isn’t getting created each time without having to carefully check the fold signature. It’s not exactly pretty, either way.

The next thing is to look at the sink / framed / encoder stuff. That will also fix the ‘amusing’ error you get if buf changes from 16 to 160 bytes, which is what it needs to be. It’s kinda clear why this happens, and i hid it while trying to solve this problem, but it’s oh so cheesy:

 error[E0277]: the trait bound `[u8; 160]: std::convert::AsRef<[u8]>` is not satisfied
  --> src/main.rs:37:16
   |
37 |         socket.send_dgram(b, &addr).map(move |(r, _)| r).map_err(|_| ())
   |                ^^^^^^^^^^ the trait `std::convert::AsRef<[u8]>` is not implemented for `[u8; 160]`
   |
   = help: the following implementations were found:
             <[T; 19] as std::convert::AsRef<[T]>>
             <[T; 9] as std::convert::AsRef<[T]>>
             <[T; 11] as std::convert::AsRef<[T]>>
             <[T; 30] as std::convert::AsRef<[T]>>
           and 30 others

error[E0599]: no method named `map` found for type `tokio::net::SendDgram<[u8; 160]>` in the current scope
  --> src/main.rs:37:37
   |
37 |         socket.send_dgram(b, &addr).map(move |(r, _)| r).map_err(|_| ())
   |                                     ^^^
   |
   = note: the method `map` exists but the following trait bounds were not satisfied:
           `tokio::net::SendDgram<[u8; 160]> : futures::Future`
           `&mut tokio::net::SendDgram<[u8; 160]> : futures::Stream`
           `&mut tokio::net::SendDgram<[u8; 160]> : futures::Future`
           `&mut tokio::net::SendDgram<[u8; 160]> : std::iter::Iterator`

:blush: It was there originally, but got lost in the last thing I tried before asking for help, which was to put the buf and count into a struct and pass that down as the state for the unfold. Didn’t help, because it wasn’t an issue with borrowing buf, as we now know.


#12

This is why it’s fair to say that arrays are second-class citizens for the time being :slight_smile:. Hopefully once const (int) generics are available this will not be an issue anymore.


#13

Huh. I pulled in futures-0.2 and it seems I’ll get more practice at this :face_with_raised_eyebrow:


#14

a week or two of reading issues and RFCs and other exploration later, and I’ve learned why I can’t fix these right now

I was starting to get discouraged, in a narrow sense: it looked like I’d have to wait longer for the ecosystem to shake out a lot of nested complex issues, and I should just stick to futures 0.1 for the foreseeable future. This is what I just read this morning, and what motivated me to come back here and update the thread:

After reading all the RFCs and discussions, this post consolidated for me why async/await seems to be such a focus among the experts. I hadn’t really understood why what sounded like syntactic sugar was seen as so important. I’ve come from many many years with Perl’s AnyEvent framework (callback closure based, basically node.pl) and sometimes Promises on top of that. I didn’t feel the need to wait for additional “convenience” before learning to use async Rust in a programming style I was already very comfortable with. I hit some of the issues (as with the original question here) but still didn’t see the bigger picture.

Anyway, it’s a short and very clear post and I commend it to you if you’ve read this far.