How to store data inside server received from method POST?


#1

I’m noob at rust and server-programming, but I try to make project for curiosity. I have a http-server building with hyper. Inside method call() (implementing from Service) of my server I process method POST:
I receve batch of data and want to store it inside my server (at one of server field), but get error which I don’t understand… Here is my code snippet:

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Batch {
	pub data: Vec<u8>,
}

impl Future for Batch {
	type Item = ();
	type Error = io::Error;

	fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
		if self.data.len() > 0 {
			Ok(Async::Ready(()))
		} else {
			Ok(Async::NotReady)
		}
	}
}

impl Batch {
	fn new() -> Batch {
		Batch{data: Vec::new()}
	}
}

struct Server{
	batch: Box<Option<Batch>>,
}

impl Service for Server {
	
    type Request  = Request;
    type Response = Response;
    type Error    = hyper::Error;
    type Future   = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {

         match (req.method(), req.path()) {            

            (&Method::Post, "/add_data") => {
        
                let send = req.body().concat2().map(|b| {

				    if let Ok(batch) = serde_json::from_slice::<Batch>(b.as_ref()) {
					    						    						    
					    match batch.poll() {
					    	Ok(Async::Ready(_)) => {
					    		self.batch = Box::new(Some(batch.clone()));
					    	}
					    	Ok(Async::NotReady) => {}

					    	_ => {}
					    }

					    println!("{:?}", batch);

					    Response::new().with_status(StatusCode::Ok)						    

				    } else {
				    	Response::new().with_status(StatusCode::NoContent)
				    }					 
                });
                
                Box::new(send)
            },           

            _ => {
                Box::new(futures::future::ok(
                    Response::new().with_status(StatusCode::NotFound)
                ))
            }
            
        }
    }
}

pub fn run_server() {
	let addr = "127.0.0.1:3000".parse().unwrap();
    let server = Http::new().bind(&addr, || Ok(Server{batch: Box::new(None)})).unwrap();
    server.run().unwrap();
}

fn main() {
    run_server();
}

and error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requiremen
ts
  --> src\main.rs:91:17
   |
91 |                 Box::new(send)
   |                 ^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on th
e method body at 63:5...
  --> src\main.rs:63:5
   |
63 | /     fn call(&self, req: Request) -> Self::Future {
64 | |
65 | |          match (req.method(), req.path()) {
66 | |
...  |
100| |         }
101| |     }
   | |_____^
note: ...so that the type `futures::Map<futures::stream::Concat2<hyper::Body>, [
closure@src\main.rs:69:53: 89:18 self:&mut &Server]>` will meet its required lif
etime bounds
  --> src\main.rs:91:17
   |
91 |                 Box::new(send)
   |                 ^^^^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Fut
ure<Error=hyper::Error, Item=hyper::Response> + 'static>, found std::boxed::Box<
futures::Future<Error=hyper::Error, Item=hyper::Response>>)
  --> src\main.rs:91:17
   |
91 |                 Box::new(send)
   |                 ^^^^^^^^^^^^^^

What’s wrong I did? How properly to store received batch of data and when it needs use it in another POST method?


#2

What is the error?


#4

The issue is the closure in call borrows a reference to self because it’s trying to assign to the batch field. The closure, however, needs to be 'static (ie has no references to anything other than 'static references); that’s because type Future = Box<Future<Item=Self::Response, Error=Self::Error>>; is actually type Future = Box<Future<Item=Self::Response, Error=Self::Error> + 'static>;

There’s a second problem here, which the compiler didn’t get to yet: call() takes a &self (not &mut self). This will prevent you from modifying the batch field as well.

To solve the &self issue, you’ll need to employ interior mutability. Put the batch into a RefCell: RefCell<Option<Batch>> and use borrow_mut() to get a mutable borrow.

To solve the lifetime issue, further encapsulate the batch into an Rc: Rc<RefCell<Option<Batch>>>. Then move a clone of the Rc into the closure. The code would then look roughly like:

let batch_clone = self.batch.clone();
...
let send = req.body().concat2().map(move |b| {
...
*batch_clone.borrow_mut() = Some(batch.clone());
...
}

[Hyper] pass mutable argument to &self.call
#5

This conception of lifetime makes me mad… I’ve got your idea and try to implement it, but face with the same problem of lifetime, when my Block has a lifetime parameter <'t> in field id and even when I change &'t str on String. Here is code snippet:

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Block<'t> {
	id:  &'t str, // the same with String
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Batch<'t> {
	#[serde(borrow)]
	data: Vec<Block<'t>>, 
}

struct Server<'t> {
	data: Rc<RefCell<Option<Batch<'t>>>>,
}

impl<'t> Service for Server<'t> {
	
    type Request  = Request;
    type Response = Response;
    type Error    = hyper::Error;
    type Future   = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {

         match (req.method(), req.path()) {            

            (&Method::Post, "/batch_insert") => {

            	let data = self.data.clone();
        
                let send = req.body().concat2().map(move |b| {

				    if let Ok(batch) = serde_json::from_slice::<Batch<'t>>(b.as_ref()) {
					    						    						    					   
					    *data.borrow_mut() = Some(batch.clone());
					    	
					    println!("Data:\n{:?}", self.data);

					    Response::new().with_status(StatusCode::Ok)						    

				    } else {
				    	Response::new().with_status(StatusCode::NoContent)
				    }					 
                });
                
                Box::new(send)
            },           

            _ => {
                Box::new(futures::future::ok(
                    Response::new().with_status(StatusCode::NotFound)
                ))
            }
            
        }
    }
}

Here is text of error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requiremen
ts
  --> src\main.rs:74:17
   |
74 |                 Box::new(send)
   |                 ^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime 't as defined on the impl
at 44:1...
  --> src\main.rs:44:1
   |
44 | / impl<'t> Service for Server<'t> {
45 | |
46 | |     type Request  = Request;
47 | |     type Response = Response;
...  |
84 | |     }
85 | | }
   | |_^
note: ...so that the type `futures::Map<futures::stream::Concat2<hyper::Body>, [
closure@src\main.rs:59:53: 72:18 data:std::rc::Rc<std::cell::RefCell<std::option
::Option<Batch<'t>>>>, self:&Server<'t>]>` will meet its required lifetime bound
s
  --> src\main.rs:74:17
   |
74 |                 Box::new(send)
   |                 ^^^^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Fut
ure<Error=hyper::Error, Item=hyper::Response> + 'static>, found std::boxed::Box<
futures::Future<Error=hyper::Error, Item=hyper::Response>>)
  --> src\main.rs:74:17
   |
74 |                 Box::new(send)
   |                 ^^^^^^^^^^^^^^

I thought wrapper Rc should help me… What’s wrong now?


#6

Change Block to hold a String (or Box<str>) - that will let you drop the lifetime parameter on the types.

The other issue is you’re still referencing (and thus capturing) self in the closure - this time to do the println!. Change that to use the data local:

println!("Data:\n{:?}", data.borrow());

#7

I need that field id is &'t str, otherwise I can’t deserialize Block<'t>.
Besides Box<&'t str> doesn’t help: error is stiil exist.


#8

Why can’t you deserialize into a String?

Right, it won’t; I was suggesting Box<str>, which is just a slightly more compact version of String.


#9

Yes, you’re right: change &'t str on str compiles.
But now I have a cognitive dissonance: what is the difference between two types - &str and str?
In my logic I used &'t str inside structures. So it is possible I can change &'t str on str everywhere without breaking code, can I?


#10

str is the string slice type. It’s a dynamically sized type (DST). I’m going to ignore custom DSTs below to make this simpler.

To make it sized, you use it behind some fat pointer, such as &str - this is a fat pointer consisting of a pointer to the start of the slice and its length. But the more pertinent to this discussion aspect is that this is a borrowed string slice - it’s a reference with a lifetime.

You can also have owned slices, eg Box<str>, Rc<str>, Arc<str>. These internally also have a fat pointer but they own the backing heap allocation that stores the actual bytes of the string. As such, they’re owning types (Rc and Arc participate in shared ownership, but it’s still ownership) - they don’t reference or borrow the string slice from anyone else, and therefore don’t need a lifetime parameter.

Why did I suggest Box<str>? A String is a Vec<u8> underneath. A Vec is 3 words: ptr to heap storage of the data, length, and capacity. If you know you’re not going to be pushing more elements to the string (ie it’s fixed size), you can turn it into a Box<str> - this drops the capacity field. You’re left with the box holding a fat pointer, which has the ptr to the start of the data and its length; no need for capacity anymore. You can turn a String into this form using its into_boxed_str method.

I hope the above explains why this won’t work.


#11

It is very clear. Thank you for your time - I appreciate. There is only one situation I don’t understand:
in the context of my server logic (inside Server) it is impossible to store and process exactly &str, isn’t it? So inside server I should use a wrapper such as Box, Rc, Arc or String… Am I right?


#12

No problem - happy to help.

In short, yes - you’re right.

The slightly longer version is for you to store a &str inside your Server means you’ve borrowed it from somewhere else. That’ll require a lifetime parameter on your Server, which is what you have now with Server<'t>. On its own, there’s absolutely nothing wrong in storing references; in fact, one can say this should be encouraged since it’ll generally yield better performance and there’s virtually no risk in having memory safety issues since the compiler will verify everything is sound.

The trouble/awkwardness comes along when using hyper (to be fair, it’s not really due to hyper - tokio, the underlying async I/O stack, requires it). Hyper will heavily steer your Service implementations to not have references (i.e. for them to be 'static). For example, if you look at https://docs.rs/hyper/0.11.20/hyper/server/struct.Http.html#method.bind, you’ll see that the NewService impl must be 'static. If it’s static, it means it doesn’t have any references. Since it’s the one that creates new Service instances, there’s no references that it can pass to it - so the Service cannot have any references.

The main reason for this is because, internally, it’ll use tokio’s Handle::spawn() to submit a future to the event loop that’s responsible for servicing accepted http connections. spawn, in turn, requires the future given to it to be 'static.

So, when you’re working with hyper (or tokio for that matter), try to stay away from references in your core types. You’ll have a much easier time.


#13

I found one bug: when I change data in the one POST, I don’t see these changes inside another POST…
Should I use Arc<Mutex<RefCell<Option<Batch>>>> to fix it?


#14

You don’t need Arc or Mutex. What you probably need is to have your NewService impl (you have one, right?) maintain the Rc<RefCell<Option<Batch>>>, and then hand out a clone of that to each Service instance it creates.


#16

You want something like this:

let addr = "127.0.0.1:8080".parse().unwrap();

let store = Rc::new(RefCell::new(StoreTick::new(10000)));

let server = Http::new().bind(&addr, move || Ok(Server {data: store.clone()}));

server.run().unwrap();

Every new client connection will return a new Server instance, but they’ll all share the store.


#18

Can you show add() and find()? Add some logging to your code to make sure the code paths you expect to be traversed are.


#19

Here is my code:

struct Server {
	data: Rc<RefCell<Store>>,
}

impl Service for Server {
	
    type Request  = Request;
    type Response = Response;
    type Error    = hyper::Error;
    type Future   = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {

         match (req.method(), req.path()) {            

            (&Method::Post, "/add") => {
            	let send = add(self.data.clone(), req);
            	println!("Store:\n{}", self.data.borrow());
            	send           
            } 

            (&Method::Post, "/find") => {
                // Here I don't see changes with self.data!
            	let send = find(self.data.clone(), req);
            	send                       
            }                      

            _ => {
                Box::new(futures::future::ok(
                    Response::new().with_status(StatusCode::BadRequest)
                ))
            }
            
        }
    }
}

fn add(data: Rc<RefCell<StoreTick>>, req: Request) -> Box<Future<Item=Response, Error=hyper::Error>> {

	let send = req.body().concat2().map(move |b| {

	    if let Ok(batch) = serde_json::from_slice::<BatchTick>(b.as_ref()) {
		    		
		    let mut store = data.borrow_mut();
                
            (*store).insert(batch.clone());
		    		    		    
		    Response::new().with_status(StatusCode::Ok)						    						    					   					    				  
	    } else {
	    	Response::new().with_status(StatusCode::NoContent)
	    }					 
    });
                
    Box::new(send)
}

fn find(data: Rc<RefCell<StoreTick>>, req: Request) -> Box<Future<Item=Response, Error=hyper::Error>> {

	let send = req.body().concat2().map(move |b| {

	    if let Ok(need) = serde_json::from_slice::<RequestTick>(b.as_ref()) {
		    		    		
		    let mut store = data.borrow_mut();

             // Here I don't see changes with store!
		    		
		    if let Some(solution) = (*store).search(... some paraneters...) {

		    		Response::new().with_status(StatusCode::Ok)
		                               .with_body(serde_json::to_vec(&solution).unwrap())
		    } else {
		    		Response::new().with_status(StatusCode::NotFound)
		    }					    	
		    	
	    } else {
	    	Response::new().with_status(StatusCode::NoContent)
	    }					 
    });

    Box::new(send)
}

#20

So StoreTick is itself a Future? What might be happening is store.poll() returns Async::NotReady in add() and then you end up dropping the batch on the floor. Add some logging to confirm.

You’re not supposed to poll() futures from the “outside”. Instead, you should be returning a future that represents completion of it, and then doing something.

Can you explain what StoreTick is doing asynchronously?


#21

I delete Future implementation for Store, but there is no effect… it seems I made small mistake with big misunderstanding. Store is a wrapper of std::collection::LinkedList.

Now I see that in the first POST changes ara applyed only to self.data.clone() but not to self.data !!!


#22

Are you basing this on the println! inside the call() method? The modification to the store is part of the future you’re returning - that future doesn’t run until you hand it back to hyper and it puts it on the reactor.


#23

How to do it? That’s my basiс question. I thought that server.run().unwrap() should run my server with all Futures inside of server and coordinate my stored data. But I delete future implementing for my data container Store and there is no effect - still empty.

And, yes, I used print!(). Printing inside add() prints my data with changes, but outside of add() prints nothing… Why?

(&Method::Post, "/add") => {
    let send = add(self.data.clone(), req); // prints data
    println!("Store:\n{}", self.data.borrow()); // prints empty
    send           
}

So I can’t process my data in other POST, because server store is empty, but I need to store and process received data at any time when it is possible for server.