I am trying to implement a low level message broker that uses as little resource as possible. To that end, I am trying to create the smallest possible proof of concept as a learning experience:
- Using tokio to send a stream of data across a tcp socket
- Capture that data stream, and (when the stream is done), add the finished result to a vector.
I have read through the tokio guide and most of the documentation, and ended up drawing from this example: tokio/print_each_packet.rs at master · tokio-rs/tokio · GitHub
So, onto the code and the problem. I declare a vector outside all the closures, and try to use that vector to hold the data from the open socket. Then, once the socket is closed, I use the completed message.
But, I get an error concerning the lifetimes of the data
vector:
error[E0597]: `*data` does not live long enough
-> src/main.rs:87:21
|
85 | .for_each(|bytes| {
| ------- capture occurs here
86 | println!("bytes: {:?}", bytes);
87 | data.push(bytes);
| ^^^^ borrowed value does not live long enough
|
= note: borrowed value must be valid for the static lifetime...
note: ...but borrowed value is only valid for the lifetime as defined on the body at 80:19
--> src/main.rs:80:19
|
80 | .for_each(move |socket| {
| ^^^^^^^^^^^^^
My code is below.
let addr = "127.0.0.1:6142".parse().unwrap();
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);
// let manager = Arc::new(Mutex::new(Manager::new()));
// manager.lock().unwrap().add(String::from("one"));
// let m1 = Arc::clone(&manager);
let mut data = vec![];
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let framed = BytesCodec::new().framed(socket);
let (_writer, reader) = framed.split();
let processor = reader
.for_each(|bytes| {
println!("bytes: {:?}", bytes);
data.push(bytes);
Ok(())
})
// After our copy operation is complete we just print out some helpful
// information.
.and_then(|()| {
println!("Socket received FIN packet and closed connection");
println!("{:?}", data);
Ok(())
}).or_else(|err| {
println!("Socket closed with error: {:?}", err);
// We have to return the error to catch it in the next ``.then` call
Err(err)
}).then(|result| {
println!("Socket closed with result: {:?}", result);
Ok(())
});
tokio::spawn(processor)
});
tokio::run(done);
Ok(())
I think I understand what it's complaining about. I am moving data
into closures which may outlive the function that data
was declared in. Incidentally, this is the main()
function. What is the way to satisfy these lifetimes?
Ultimately, I'm using a vector because I already have a data structure designed that acts like a vector, but is wrapped in a let manager = Arc::new(Mutex::new(Manager::new()));
to work across multiple threads.
In the end, this should accept messages from an incoming socket, decode the message, and insert the message into the manager
(a queue). That's it.
As a side note, I've used other tokio examples and successfully moved the manager
instance into closures without a problem.