[SOLVED] Tokio, closures, moving values, and "does not live long enough"

I am trying to implement a low level message broker that uses as little resource as possible. To that end, I am trying to create the smallest possible proof of concept as a learning experience:

  1. Using tokio to send a stream of data across a tcp socket
  2. Capture that data stream, and (when the stream is done), add the finished result to a vector.

I have read through the tokio guide and most of the documentation, and ended up drawing from this example: tokio/print_each_packet.rs at master · tokio-rs/tokio · GitHub

So, onto the code and the problem. I declare a vector outside all the closures, and try to use that vector to hold the data from the open socket. Then, once the socket is closed, I use the completed message.

But, I get an error concerning the lifetimes of the data vector:

error[E0597]: `*data` does not live long enough
  -> src/main.rs:87:21
   |
85 |                 .for_each(|bytes| {
   |                           ------- capture occurs here
86 |                     println!("bytes: {:?}", bytes);
87 |                     data.push(bytes);
   |                     ^^^^ borrowed value does not live long enough
   |
   = note: borrowed value must be valid for the static lifetime...
note: ...but borrowed value is only valid for the lifetime  as defined on the body at 80:19
  --> src/main.rs:80:19
   |
80 |         .for_each(move |socket| {
   |                   ^^^^^^^^^^^^^

My code is below.

let addr = "127.0.0.1:6142".parse().unwrap();

let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);

// let manager = Arc::new(Mutex::new(Manager::new()));
// manager.lock().unwrap().add(String::from("one"));

// let m1 = Arc::clone(&manager);

let mut data = vec![];

let done = socket
    .incoming()
    .map_err(|e| println!("failed to accept socket; error = {:?}", e))
    .for_each(move |socket| {
        let framed = BytesCodec::new().framed(socket);
        let (_writer, reader) = framed.split();

        let processor = reader
            .for_each(|bytes| {
                println!("bytes: {:?}", bytes);
                data.push(bytes);
                Ok(())
            })
            // After our copy operation is complete we just print out some helpful
            // information.
            .and_then(|()| {
                println!("Socket received FIN packet and closed connection");
                println!("{:?}", data);
                Ok(())
            }).or_else(|err| {
                println!("Socket closed with error: {:?}", err);
                // We have to return the error to catch it in the next ``.then` call
                Err(err)
            }).then(|result| {
                println!("Socket closed with result: {:?}", result);
                Ok(())
            });

        tokio::spawn(processor)
    });

tokio::run(done);
Ok(())

I think I understand what it's complaining about. I am moving data into closures which may outlive the function that data was declared in. Incidentally, this is the main() function. What is the way to satisfy these lifetimes?

Ultimately, I'm using a vector because I already have a data structure designed that acts like a vector, but is wrapped in a let manager = Arc::new(Mutex::new(Manager::new())); to work across multiple threads.

In the end, this should accept messages from an incoming socket, decode the message, and insert the message into the manager (a queue). That's it.

As a side note, I've used other tokio examples and successfully moved the manager instance into closures without a problem.

For data.push(bytes); you need mutable reference of data object. Data object lives until end of your for_each closure (because with move keyword, you are moving every field which is used in this closure into it). However with tokio::spawn(processor) you are postponing evaluation of whole processor for later, decisively outside of this for_each scope (in this code it is let say until tokio::run(done); - as far as I remember it is blocking under final result is computed). There is no straight way, to specify which variables you want to move, and which borrow into closure. There is a trick - after you create your data structure, you may create reference to it, and use it in closure (so only reference would be moved, no whole object), like this: let data_ref = &mut data;. The one problem you may face, is that obviously you will not be able to use any function on this data in scope, where you have data_ref, because if you have mutable borrow, you may not have any other one. There are two solutions - first is to enclose data_ref, done and tokio::run in its own scope (so outside of it you may easly use data as you want), or just enable nll which should also help (as long as data_ref would be used only in done closure, when its moved into tokio::run, any referencing to data should be valid I guess (but its more or less intuition, I am still not perfectly know how nll works).

Are you sure you want to buffer the data until the socket is closed? Typically, you'd implement a Codec that can decode a single frame (i.e. request), and then process that. If you're implementing a message broker, it seems surprising that a client connects, sends a message, and disconnects - I'd expect it to keep the connection alive in most cases, so that more messages can be sent later.

But, if you want to collect all the bytes up until the socket is closed, you can use concat2() for that, e.g.:

let processor = reader
                .concat2()
                 // `data` will be a `BytesMut` holding all the bytes received on this conn
                .and_then(|data| {
                    println!("Socket received FIN packet and closed connection");
                    println!("{:?}", data);
                    Ok(())
                })

@vitalyd, thank you. You are right (as I look at it) that I do not want to buffer the data until the socket is closed.

@hashedone, thank you for helping me understand what is actually happening. However, I tried using a reference and the scope trick (a couple variations). None of them quite worked.

I do (or think I do) have NLL active. I am using rust v 1.30.1.

Let me be more specific about what I am ultimately trying to accomplish and then post my cleaned up code.

I have a manager, (basically a wrapper around VecDeque) that shares data across multiple threads (from examples in The Book) using Arc and Mutex. I have tested that and it works well.

I will open up many listening sockets. As a socket gets a request, it simply pushes that request onto the queue in manager.Another set of sockets is listening for requests to read from manager. All this is done in tokio sockets, which use ThreadPools by default (if I remember what I read).

In this code, I'm just trying to get one socket to push onto the shared manager instance. I feel if I can get that, the next steps are easy.

So, here is my revised code:

let addr = "127.0.0.1:6142".parse().unwrap();
let socket = TcpListener::bind(&addr)?;

let manager = Arc::new(Mutex::new(Manager::new()));
let m_clone = Arc::clone(&manager);

let done = socket
    .incoming()
    .map_err(|e| println!("failed to accept socket; error = {:?}", e))
    .for_each(move |socket| {
        let framed = BytesCodec::new().framed(socket);
        let (_writer, reader) = framed.split();

        let processor = reader
            .for_each(|bytes| {
                m_clone.lock().unwrap().push(bytes); // There will be a decoding step here, but that's already tested
                Ok(())
            }).or_else(|err| {
                println!("Socket closed with error: {:?}", err);
                Err(err)
            }).then(|result| {
                println!("Socket closed with result: {:?}", result);
                Ok(())
            });

        tokio::spawn(processor)
    });

tokio::run(done);

Ok(())

All that said, I still get the same does not live long enough error with m_clone or manager. I've tried using a reference.

As usual, I think my problems really stem from a deeper misunderstanding that led to a poor fundamental design choice. If there is a larger structural change, I'm happy to make that. Thanks again.

Problem is still the same - your m_clone is moved into outer for_each closure, then you are borrowing it in inner for_each closure, which may be executed after outer for_each scope is left (so m_close does not live long enaugh). However, as long, as you using Arc type, the simplest (dont't know if the best, it is really case dependent) would be just move cloned Arc into inner for_each, something like that:

let socket = TcpListener::bind(&addr)?;

let manager = Arc::new(Mutex::new(Manager::new()));
let m_clone = Arc::clone(&manager);

let done = socket
    .incoming()
    .map_err(|e| println!("failed to accept socket; error = {:?}", e))
    .for_each(move |socket| {
        let framed = BytesCodec::new().framed(socket);
        let (_writer, reader) = framed.split();

        let processor = reader
            .for_each(move |bytes| {
                m_clone.lock().unwrap().push(bytes); // There will be a decoding step here, but that's already tested
                Ok(())
            }).or_else(|err| {
                println!("Socket closed with error: {:?}", err);
                Err(err)
            }).then(|result| {
                println!("Socket closed with result: {:?}", result);
                Ok(())
            });

        tokio::spawn(processor)
    });

tokio::run(done);

Ok(())

I see, so move into both foreach closures. I tried that and got a new (and interesting) error. Incidentally, in my shotgun attempts to work this out before posting, I must have tried this before because I remember this error.

error[E0507]: cannot move out of captured outer variable in an FnMut closure
--> src/main.rs:81:27
|
71 | let m_clone = Arc::clone(&manager);
| ------- captured outer variable
...
81 | .for_each(move |bytes| {
| ^^^^^^^^^^^^ cannot move out of captured outer variable in an FnMut closure

error: aborting due to previous error

For more information about this error, try rustc --explain E0507.

The m_clone should be created inside the first closure, like so:

let socket = TcpListener::bind(&addr)?;

    let manager = Arc::new(Mutex::new(Manager));

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            let framed = BytesCodec::new().framed(socket);
            let (_writer, reader) = framed.split();
            // Clone here, which will then be `move`'d into the inner closure
            let m_clone = Arc::clone(&manager);
            let processor = reader
                .for_each(move |bytes| {
                    m_clone.lock().unwrap().push(bytes); // There will be a decoding step here, but that's already tested
                    Ok(())
                })
                .or_else(|err| {
                    println!("Socket closed with error: {:?}", err);
                    Err(err)
                })
                .then(|result| {
                    println!("Socket closed with result: {:?}", result);
                    Ok(())
                });

            tokio::spawn(processor)
        });

    tokio::run(done);

    Ok(())
1 Like

That's perfect! Thank you @vitalyd and @hashedone. To test my understanding, I created an additional clone and added some to the .then() closure to print out all the data received over the socket until it was closed. I also created a small separate app that sends data at random intervals so I could watch it.

For all the googlers interested in the final solution, here's what I have:

fn main() -> Result<(), Box<std::error::Error>> {
    let addr = "127.0.0.1:6142".parse().unwrap();
    let socket = TcpListener::bind(&addr)?;
    println!("Listening on: {}", addr);

    let manager = Arc::new(Mutex::new(Manager::new()));

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            let framed = BytesCodec::new().framed(socket);
            let (_writer, reader) = framed.split();

            let m_clone = Arc::clone(&manager);  // for use in the closure that receives data
            let m_clone2 = Arc::clone(&m_clone); // for use in the closure that prints *all* received data after socket is closed

            let processor = reader
                .for_each(move |bytes| {
                    // Some decoding happens here
                    m_clone
                        .lock()
                        .unwrap()
                        .push(bytes);
                    Ok(())
                }).or_else(|err| {
                    println!("Socket closed with error: {:?}", err);
                    Err(err)
                }).then(move |result| {
                    println!("Socket closed with result: {:?}", result);

                    // Just checking that the manager has all the info
                    let mut mgr = m_clone2.lock().unwrap();
                    for item in mgr.iter() {
                        println!("{:?}", item);
                    }

                    Ok(())
                });

            tokio::spawn(processor)
        });

    tokio::run(done);

    Ok(())
} 

Thanks again! Great community.