HashMap isn't outlive the executed futures

Hi, everybody.

I'm trying to figure out what I'm doing with a code, which is coping headers in the future that applied one per a user, connected via WebSockets. Those headers will be used after for a checking a token and validate it (probably for another kind stuff later, for example, checking permissions). In this case headers should outlive not also in future that initialize headers variable, but should stay alive until user (or a server) won't close the established connection.

Can anybody help me with it and, if it possible, explain why the compiler is noticing me, that doing something wrong? And what will be the best solution for this particular case? Swapping the lines, like it was mention as error-index page, leads to the error with the "context" with a variable (or that headers isn't found in this scope).

So, this is code were I'm getting an error:

pub fn run(&self, address: SocketAddr) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&address, &handle).unwrap();
    println!("Listening on: {}", address);

    let server = socket.incoming().for_each(|(stream, addr)| {
        let engine_inner = self.engine.clone();
        let connections_inner = self.connections.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        let handle_inner = handle.clone();

        let mut headers: HashMap<String, Box<[u8]>> = HashMap::new();
        let copy_headers_callback = |request: &Request| {
            for &(ref name, ref value) in request.headers.iter() {
                headers.insert(name.to_string(), value.clone());
            }
            Ok(None)
        };

        accept_hdr_async(stream, copy_headers_callback)
            // Process the messages
            .and_then(move |ws_stream| {
            // ...
    });

    // Run the server
    core.run(server).unwrap();
}

And the output that was generated by the compiler:

error[E0597]: `headers` does not live long enough
   --> src/proxy.rs:60:21
    |
58  |             let copy_headers_callback = |request: &Request| {
    |                                         ------------------- capture occurs here
59  |                 for &(ref name, ref value) in request.headers.iter() {
60  |                     headers.insert(name.to_string(), value.clone());
    |                     ^^^^^^^ does not live long enough
...
111 |         });
    |         - borrowed value only lives until here
...
115 |     }
    |     - borrowed value needs to live until here

Can you show the rest of the code, namely the elided portions here:

accept_hdr_async(stream, copy_headers_callback)
            // Process the messages
            .and_then(move |ws_stream| {
            // ... <-- HERE

The HashMap is created inside each for_each invocation and then it's borrowed by the copy_headers_callback closure - this means the HashMap needs to be alive while that closure has a reference to it. That callback closure is then moved into accept_hdr_async, and wrapped in an AndThen struct representing the and_then(move |ws_stream| ...) call. What happens after that? I suspect at some point you might be sending some struct that's keeping the copy_headers_callback closure back into the reactor (Core). At that point, the copy_headers_callback closure might outlive the HashMap as far as the borrow checker is concerned.

I realize this is code from your other thread, but I'm not sure what the full code looks like now. As I mentioned in that thread, it's not immediately clear to me how to share a per-connection HashMap between the main connection handling and another future that does the Redis lookup asynchronously.

It would seem that tokio-tungstenite should allow the Callback to return its own auxiliary data from the on_request() method. That way you could create the HashMap inside the copy_headers_callback closure itself, and then return it from there. AsyncAccept could then forward that value to your and_then there. But this is a change to their API so not going to help you at this point either way.

@vitalyd, this is the full listing of code:

use std::cell::{RefCell};
use std::collections::{HashMap};
use std::error::{Error};
use std::net::{SocketAddr};
use std::rc::{Rc};

use super::engine::{Engine};
use super::router::{Router};
use super::middleware::{Headers, Middleware, EmptyMiddleware};
// use super::token::middleware::{JwtTokenMiddleware};

use cli::{CliOptions};
use futures::sync::{mpsc};
use futures::{Future, Sink};
use futures::stream::{Stream};
use tokio_core::net::{TcpListener};
use tokio_core::reactor::{Core};
use tokio_tungstenite::{accept_hdr_async};
use tungstenite::handshake::server::{Request};
use tungstenite::protocol::{Message};


pub struct Proxy {
    engine: Rc<RefCell<Engine>>,
    connections: Rc<RefCell<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
    auth_middleware: Rc<RefCell<Box<Middleware>>>,
}


impl Proxy {
    pub fn new(router: Box<Router>, cli: &CliOptions) -> Proxy {
        let auth_middleware: Box<Middleware> = Box::new(EmptyMiddleware::new(cli));

        Proxy {
            engine: Rc::new(RefCell::new(Engine::new(router))),
            connections: Rc::new(RefCell::new(HashMap::new())),
            auth_middleware: Rc::new(RefCell::new(auth_middleware)),
        }
    }

    pub fn run(&self, address: SocketAddr) {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let socket = TcpListener::bind(&address, &handle).unwrap();
        println!("Listening on: {}", address);

        let server = socket.incoming().for_each(|(stream, addr)| {
            let engine_inner = self.engine.clone();
            let connections_inner = self.connections.clone();
            let auth_middleware_inner = self.auth_middleware.clone();
            let handle_inner = handle.clone();

            let mut headers: Headers = HashMap::new();
            let copy_headers_callback = |request: &Request| {
                for &(ref name, ref value) in request.headers.iter() {
                    headers.insert(name.to_string(), value.clone());
                }
                Ok(None)
            };

            accept_hdr_async(stream, copy_headers_callback)
                // Process the messages
                .and_then(move |ws_stream| {
                    // Create a channel for the stream, which other sockets will use to
                    // send us messages. It could be used for broadcasting your data to
                    // another users in the future.
                    let (tx, rx) = mpsc::unbounded();
                    connections_inner.borrow_mut().insert(addr, tx);

                    // Split the WebSocket stream so that it will be possible to work
                    // with the reading and writing halves separately.
                    let (sink, stream) = ws_stream.split();

                    let _auth_future = auth_middleware_inner.borrow().process_request(&headers, &handle_inner);

                    // Read and process each message
                    let connections = connections_inner.clone();
                    let ws_reader = stream.for_each(move |message: Message| {
                        engine_inner.borrow().handle(&message, &addr, &connections);
                        Ok(())
                    });

                    // Write back prepared responses
                    let ws_writer = rx.fold(sink, |mut sink, msg| {
                        sink.start_send(msg).unwrap();
                        Ok(sink)
                    });

                    // Wait for either half to be done to tear down the other
                    let connection = ws_reader.map(|_| ()).map_err(|_| ())
                                              .select(ws_writer.map(|_| ()).map_err(|_| ()));

                    // Close the connection after using
                    handle_inner.spawn(connection.then(move |_| {
                        connections_inner.borrow_mut().remove(&addr);
                        println!("Connection {} closed.", addr);
                        Ok(())
                    }));

                    Ok(())
                })
                // An error occurred during the WebSocket handshake
                .or_else(|err| {
                    println!("{}", err.description());
                    Ok(())
                })
        });

        // Run the server
        core.run(server).unwrap();
    }
}

In the code which is discussing right now we're doing nothing special: just create the HashMap instance, before the future per each client will be started. In the moment when the users comes to the server, invoking the copy_headers_callback that copying the WebSocket headers as is into HashMap and firing the accept_hdr_async code with futures.

One of possible ideas, probably, to deal with it is specify headers as the part of "Proxy" struct, like this:

pub struct Proxy {
    engine: Rc<RefCell<Engine>>,
    connections: Rc<RefCell<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
    headers: Rc<RefCell<HashMap<Uuid, Headers>>>,
    auth_middleware: Rc<RefCell<Box<Middleware>>>,
}


impl Proxy {
    pub fn new(router: Box<Router>, cli: &CliOptions) -> Proxy {
        // ...
        let auth_middleware: Box<Middleware> = Box::new(EmptyMiddleware::new(cli));

        Proxy {
            engine: Rc::new(RefCell::new(Engine::new(router))),
            connections: Rc::new(RefCell::new(HashMap::new())),
            auth_middleware: Rc::new(RefCell::new(auth_middleware)),
            headers: Rc::new(RefCell::new(HashMap::new()))
        }
    }

After it inside of server expression specify (probably even generate) a unique identifier in runtime, which will be used a key, and the value will be a copy of actual WebSocket headers that were prepared in copy_headers_callback. On the final steps, when the client will close the connection by some certain reason, we will delete all of those headers without any doubts. But in the some point of view it looks like a dirty hack, isn't it?

Honestly I'd like to prolong a life of an object with Rc<T> and to go with a code like this:

pub fn run(&self, address: SocketAddr) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&address, &handle).unwrap();
    println!("Listening on: {}", address);

    let server = socket.incoming().for_each(|(stream, addr)| {
        let engine_inner = self.engine.clone();
        let connections_inner = self.connections.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        let handle_inner = handle.clone();

        let mut headers: Rc<Headers> = Rc::new(HashMap::new());
        let copy_headers_callback = |request: &Request| {
            let mut headers_copy = headers.clone();
            for &(ref name, ref value) in request.headers.iter() {
                headers_copy.insert(name.to_string(), value.clone());
            }
            Ok(None)
        };

        accept_hdr_async(stream, copy_headers_callback)
            // Process the messages
            .and_then(move |ws_stream| {
                // Create a channel for the stream, which other sockets will use to
                // send us messages. It could be used for broadcasting your data to
                // another users in the future.
                let (tx, rx) = mpsc::unbounded();
                connections_inner.borrow_mut().insert(addr, tx);

                // Split the WebSocket stream so that it will be possible to work
                // with the reading and writing halves separately.
                let (sink, stream) = ws_stream.split();

                let headers_inner = headers.clone();
                let _auth_future = auth_middleware_inner.borrow().process_request(&headers_inner, &handle_inner);

But I have no idea how to make much better. The compiler still generating the same error:

error[E0597]: `headers` does not live long enough
   --> src/proxy.rs:59:36
    |
58  |             let copy_headers_callback = |request: &Request| {
    |                                         ------------------- capture occurs here
59  |                 let headers_copy = headers.clone();
    |                                    ^^^^^^^ does not live long enough
...
113 |         });
    |         - borrowed value only lives until here
...
117 |     }
    |     - borrowed value needs to live until here

error[E0596]: cannot borrow immutable borrowed content as mutable
  --> src/proxy.rs:61:21
   |
61 |                     headers_copy.insert(name.to_string(), value.clone());
   |                     ^^^^^^^^^^^^ cannot borrow as mutable

You can try cloning the Rc outside the closures you’re passing them to and then move those clones into the closures.

You’ll need an Rc<RefCell<HashMap<...>>> so you can borrow mutably in the callback.

I've run into similar problems with futures chains before, and usually solve them by moving data through the futures chain explicitly. This is harder in your case because of the upstream library, but if tokio_tungstenite's API looked something like

pub trait Callback<UserData>: Sized {
    fn on_request {
        self, 
        request: &Request
    ) -> Result<(Option<Vec<(String, String)>>, UserData)>;
}

struct AcceptAsync<S, C, UserData> { .. }
impl<S, C, UserData> Future AcceptAsync<S, C, UserData> {
    type Item = (WebSocketStream, UserData);
    ...
}

Then you could make this just work. I find myself doing things like this often when writing my own futures-based code. I wonder if I'm the only one ...

You’re not the only one - I suggested that tungstenite should do that in an earlier post in this thread :slight_smile:. Threading data through like that is a good way to avoid borrow problems.

Ah, must have missed it. My bad :slight_smile: