Recover from panics and errors in future chains


#1

Dealing with future chains, I am aware of 3 places of errors and exceptions (from Scala world, but I see the same is in Rust):

  1. exception during construction of the future chain
  2. exception in the closure during execution of the chain
  3. explicit return of completed unsuccessful future

I have attempted to implement recovery in one single place using panic::catch_unwind, but this is tough to break through the compiler errors. I have concluded that I am doing something wrong and there is better way of doing this.

If it was Scala, there is dedicated recover call on a Future to recover cases 2 and 3. It maps an error/exception to a successful future result. The 1 is covered by try ... catch or by util::Try.

Could you please advise what mechanisms are available for recovering 1, 2 and 3 in Rust?

Here is my code (which compiles if I remove catch_unwind line and block where the result is handled). All 3 places where I attempt to cause an error or exception are marked in comments by #1, #2 and #3 markers in comments.

extern crate futures;
extern crate hyper;
extern crate tokio_core;

extern crate serde;
extern crate serde_json;

use futures::future::Future;

use hyper::server::{Http, Request, Response, Service};

use futures::Stream;

use std::panic;
use std::result;
use std::error;

#[derive(Serialize, Deserialize, Debug)]
struct Point {
    x: i32,
    y: i32,
}

struct AgentService;

impl AgentService {
    fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
        let full_body = body.concat2()
            .map(|chunk| {
                let v = chunk.to_vec();
                String::from_utf8_lossy(&v).to_string()
            });
        Box::new(full_body)

        // PLACE #1: panics during the construction of the future chain
        // unimplemented!()
    }
}

impl Service for AgentService {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, _req: Request) -> Box<Future<Item=Self::Response, Error=Self::Error>> {
        let result = panic::catch_unwind(|| {
            println!("Received {:?}", _req);
            let resp: Box<Future<Item=Self::Response, Error=Self::Error>> = match _req.path() {
                "/avl" if _req.method() == &hyper::Method::Get => {
                    Box::new(self.get_body(_req.body())
                        .map(|r| {
                            println!("{:?}", r);
                            // PLACE #2: panics during future chain execution if no body content is sent
                            let deserialized: Point = serde_json::from_str(&r).unwrap();
                            println!("deserialized: {:?}", deserialized);
                            let resp = Response::new().with_status(hyper::StatusCode::Created);
                            resp
                        }))
                }
                _ => {
                    let resp = Response::new().with_status(hyper::StatusCode::NotFound);
                    Box::new(futures::future::ok(resp))
                    // PLACE #3: returns failed future result intentionally either:
                    // - during construction of the chain
                    // - or during execution of the chain
                    // Box::new(futures::future::err(hyper::Error::Method))
                }
            };
            resp
        });

        //println!("Caught {:?}", result);
        // PLACE where final recovery happens for all of the cases
        match result {
            Ok(f) => {
                return f;
            }
            Err(e) => {
                return Box::new(futures::future::ok(Response::new().with_status(hyper::StatusCode::InternalServerError)));
            }
        }
    }
}

pub fn run() {
    let addr = "127.0.0.1:3000".parse().unwrap();
    let server = Http::new().bind(&addr, || Ok(AgentService)).unwrap();
    server.run().unwrap();
}

Here are the compiler errors:

error[E0277]: the trait bound `std::cell::UnsafeCell<*mut futures::sync::mpsc::queue::Node<std::option::Option<std::result::Result<hyper::Chunk, hyper::Error>>>>: std::panic::RefUnwindSafe` is not satisfied in `futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>`
  --> src\server.rs:47:22
   |
47 |         let result = panic::catch_unwind(|| {
   |                      ^^^^^^^^^^^^^^^^^^^ the type std::cell::UnsafeCell<*mut futures::sync::mpsc::queue::Node<std::option::Option<std::result::Result<hyper::Chunk, hyper::Error>>>> may contain interior mutability and a reference may not be safely transferrable across a catch_unwind boundary
   |
   = help: within `futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>`, the trait `std::panic::RefUnwindSafe` is not implemented for `std::cell::UnsafeCell<*mut futures::sync::mpsc::queue::Node<std::option::Option<std::result::Result<hyper::Chunk, hyper::Error>>>>`
   = note: required because it appears within the type `futures::sync::mpsc::queue::Queue<std::option::Option<std::result::Result<hyper::Chunk, hyper::Error>>>`
   = note: required because it appears within the type `futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>`
   = note: required because of the requirements on the impl of `std::panic::UnwindSafe` for `std::sync::Arc<futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>>`
   = note: required because it appears within the type `futures::sync::mpsc::Receiver<std::result::Result<hyper::Chunk, hyper::Error>>`
   = note: required because it appears within the type `tokio_proto::streaming::body::Inner<hyper::Chunk, hyper::Error>`
   = note: required because it appears within the type `tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>`
   = note: required because it appears within the type `hyper::Body`
   = note: required because it appears within the type `std::option::Option<hyper::Body>`
   = note: required because it appears within the type `hyper::Request`
   = note: required because it appears within the type `[closure@src\server.rs:47:42: 71:10 _req:hyper::Request, self:&&server::AgentService]`
   = note: required by `std::panic::catch_unwind`

error[E0277]: the trait bound `std::cell::UnsafeCell<*mut futures::sync::mpsc::queue::Node<std::sync::Arc<std::sync::Mutex<futures::sync::mpsc::SenderTask>>>>: std::panic::RefUnwindSafe` is not satisfied in `futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>`
  --> src\server.rs:47:22
   |
47 |         let result = panic::catch_unwind(|| {
   |                      ^^^^^^^^^^^^^^^^^^^ the type std::cell::UnsafeCell<*mut futures::sync::mpsc::queue::Node<std::sync::Arc<std::sync::Mutex<futures::sync::mpsc::SenderTask>>>> may contain interior mutability and a reference may not be safely transferrable across a catch_unwind boundary
   |
   = help: within `futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>`, the trait `std::panic::RefUnwindSafe` is not implemented for `std::cell::UnsafeCell<*mut futures::sync::mpsc::queue::Node<std::sync::Arc<std::sync::Mutex<futures::sync::mpsc::SenderTask>>>>`
   = note: required because it appears within the type `futures::sync::mpsc::queue::Queue<std::sync::Arc<std::sync::Mutex<futures::sync::mpsc::SenderTask>>>`
   = note: required because it appears within the type `futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>`
   = note: required because of the requirements on the impl of `std::panic::UnwindSafe` for `std::sync::Arc<futures::sync::mpsc::Inner<std::result::Result<hyper::Chunk, hyper::Error>>>`
   = note: required because it appears within the type `futures::sync::mpsc::Receiver<std::result::Result<hyper::Chunk, hyper::Error>>`
   = note: required because it appears within the type `tokio_proto::streaming::body::Inner<hyper::Chunk, hyper::Error>`
   = note: required because it appears within the type `tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>`
   = note: required because it appears within the type `hyper::Body`
   = note: required because it appears within the type `std::option::Option<hyper::Body>`
   = note: required because it appears within the type `hyper::Request`
   = note: required because it appears within the type `[closure@src\server.rs:47:42: 71:10 _req:hyper::Request, self:&&server::AgentService]`
   = note: required by `std::panic::catch_unwind`

error[E0277]: the trait bound `hyper::header::Header + std::marker::Send + std::marker::Sync + 'static: std::panic::UnwindSafe` is not satisfied
  --> src\server.rs:47:22
   |
47 |         let result = panic::catch_unwind(|| {
   |                      ^^^^^^^^^^^^^^^^^^^ the type hyper::header::Header + std::marker::Send + std::marker::Sync + 'static may not be safely transferred across an unwind boundary
   |
   = help: the trait `std::panic::UnwindSafe` is not implemented for `hyper::header::Header + std::marker::Send + std::marker::Sync + 'static`
   = note: required because of the requirements on the impl of `std::panic::UnwindSafe` for `std::ptr::Unique<hyper::header::Header + std::marker::Send + std::marker::Sync + 'static>`
   = note: required because it appears within the type `std::boxed::Box<hyper::header::Header + std::marker::Send + std::marker::Sync + 'static>`
   = note: required because it appears within the type `hyper::header::internals::cell::PtrMap<std::boxed::Box<hyper::header::Header + std::marker::Send + std::marker::Sync + 'static>>`
   = note: required because it appears within the type `std::cell::UnsafeCell<hyper::header::internals::cell::PtrMap<std::boxed::Box<hyper::header::Header + std::marker::Send + std::marker::Sync + 'static>>>`
   = note: required because it appears within the type `hyper::header::internals::cell::PtrMapCell<hyper::header::Header + std::marker::Send + std::marker::Sync + 'static>`
   = note: required because it appears within the type `hyper::header::internals::item::Item`
   = note: required because it appears within the type `(hyper::header::HeaderName, hyper::header::internals::item::Item)`
   = note: required because of the requirements on the impl of `std::panic::UnwindSafe` for `std::ptr::Unique<(hyper::header::HeaderName, hyper::header::internals::item::Item)>`
   = note: required because it appears within the type `alloc::raw_vec::RawVec<(hyper::header::HeaderName, hyper::header::internals::item::Item)>`
   = note: required because it appears within the type `std::vec::Vec<(hyper::header::HeaderName, hyper::header::internals::item::Item)>`
   = note: required because it appears within the type `hyper::header::internals::vec_map::VecMap<hyper::header::HeaderName, hyper::header::internals::item::Item>`
   = note: required because it appears within the type `hyper::Headers`
   = note: required because it appears within the type `hyper::Request`
   = note: required because it appears within the type `[closure@src\server.rs:47:42: 71:10 _req:hyper::Request, self:&&server::AgentService]`
   = note: required by `std::panic::catch_unwind`

What does Rust need today for server workloads?