Futures and Async Design

Hey guys, i am trying to integrate futures into a design and am struggling a little. So the existing architecture contains an event queue. These are the simple steps that take place. C front-end calls FFI function -> request gets posted to event-loop -> request is handled and async request is made into another crate -> the result is processed by another crate (which could do networking etc) and is posted back to our event-loop -> call the callback given by C to return the result. So as it interacts with other rust crate in a particular way, i cannot change that. However in my crate i have callback based design which i want to covert to future based. So to demo the simple procedure outlined above here is some code:

// FFI function - so one Sender to event loop is with the C code as opaque handle
// This will post the request to the event loop.
#[no_mangle]
pub unsafe extern "C" fn ffi_request(handle: *mut Sender<CoreMsg>, callback: unsafe extern "C" fn(i32)) {
    (*handle).send(CoreMsg::new(move |cptr| {
        let cptr_clone = cptr.clone();
        cptr.borrow_mut().f0(move |res_0| {
            cptr_clone.f1(res_0, move |res_1| {
                let final_res: i32 = some_transformation(res_1);
                callback(final_res);
            })
        })
    }));
}

// The main core event loop
fn run_core_event_loop(core_rx: mpsc::Receiver<CoreMsg>) {
    thread::spawn(move || {
        let cptr = Rc::new(RefCell::new(Core::new()));

        for msg in core_rx.iter() {
            (msg.0)(&cptr);
        }
    }
}

// Core state
#[derive(Copy, Clone)]
enum Event {
    Res0,
    Res1,
}
struct Core {
    queue: HashMap<u32, Event>,
}
impl Core {
    fn new() -> Self { ... }

    fn f0<F: FnMut(Event)>(&mut self, f: F) {
        let id: u32 = unique_id();
        self.queue.insert(id, Box::new(f));

        other_crate.async_call_0(id);
    }

    fn f1<F: FnMut(Event)>(&mut self, val: Event, f: F) {
        let id: u32 = unique_id();
        self.queue.insert(id, Box::new(f));

        other_crate.async_call_1(id, val);
    }
}

// Receive async notification from other crate and post it to our loop
fn rx_from_other_crate(rx: mpsc::Receiver<(id, Event)>, core_tx: mpsc::Sender<CoreMsg>) {
    thread::spawn(move || {
        for it in rx.iter() {
            // This will post the request to the event loop.
            core_tx.send(CoreMsg::new(move |cptr| {
                let f = cptr.borrow_mut().queue.remove(it.0).unwrap();
                f(it.1); // ----------- MARK-1
            }));
        }
    }
}

Though CoreMsg is not important but just for sake of completion here is how it looks:

struct CoreMsg(Box<FnMut(&Rc<RefCell<Core>>) + Send + 'static)'
impl CoreMsg {
    fn new<F: FnOnce(&Rc<RefCell<Core>>) + Send + 'static>(f: F) -> Self {
        let f = Some(f);
        CoreMsg(Box::new(move |cptr| {
            let f = f.unwrap();
            f(cptr);
        }
    }
}

All it does is makes up for the lack of Box<FnOnce> right now in Rust.

So as you see it's a pretty simple design where both sides (FFI and other-crate response receiver) post their respective stuffs to the core event loop which does some processing. In complex code the callbacks end up nesting deeply and create inversion of control. I believe futures can help here tremendously. The trouble is i am not sure how to correctly do it. I tried using Complete<Event> <-> Oneshot<Event> pair oneshot() returning Oneshot<Event> from Core::f0 and Core::f1 and changing Core::queue to HashMap<u32, Complete<Event>>. With that MARK-1 changes to f.complete(it.1) (where f is now a Complete future insted of callback/function).

However as you will notice this design become difficult as Core::f0 will register a Complete<Event> and return a Oneshot<Event> (so far good) but Core::f1 will also do that with a separate pair of it.

Transforming the above to future the relavant changes look like:

// FFI function - so one Sender to event loop is with the C code as opaque handle
// This will post the request to the event loop.
#[no_mangle]
pub unsafe extern "C" fn ffi_request(handle: *mut Sender<CoreMsg>, callback: unsafe extern "C" fn(i32)) {
    (*handle).send(CoreMsg::new(move |cptr| {
        let cptr_clone = cptr.clone();
        let (id, fut_0) = cptr.borrow_mut().f0();

        let tail_0 = fut_0.map(|res_0| {
            let (id, fut_1) = cptr_clone.borrow_mut().f1(res_0);
            let tail_1 = fut_1.map(|res_1| {
                let final_res: i32 = some_transformation(res_1);
                callback(final_res);
            }).map_err(|_| ());

            cptr_clone.borrow_mut().queue.get_mut(id).unwrap().1 = Some(Box::new(tail_1));
        }).map_err(|_| ());

        cptr.borrow_mut().queue.get_mut(id).unwrap().1 = Some(Box::new(tail_0));
    }));
}

struct Core {
    queue: HashMap<u32, (Complete<Event>, Option<Box<Future<Item=(), Error=()>>>),
}
impl Core {
    fn f0(&mut self) -> (u32, Oneshot<Event>) {
        let (tx, rx) = futures::oneshot();
        let id: u32 = unique_id();
        self.queue.insert(id, (tx, None));

        other_crate.async_call_0(id);

        (id, rx)
    }

    fn f1(&mut self, val: Event) -> (u32, Oneshot<Event>) {
        let (tx, rx) = futures::oneshot();
        let id: u32 = unique_id();
        self.queue.insert(id, (tx, None));

        other_crate.async_call_1(id, val);

        (id, rx)
    }
}

// Receive async notification from other crate and post it to our loop
fn rx_from_other_crate(rx: mpsc::Receiver<(id, Event)>, core_tx: mpsc::Sender<CoreMsg>) {
    thread::spawn(move || {
        for it in rx.iter() {
            // This will post the request to the event loop.
            core_tx.send(CoreMsg::new(move |cptr| {
                let f = cptr.borrow_mut().queue.remove(it.0).unwrap();
                f.0.complete(it.1);
                // Although this looks like a blocking wait it won't because we just completed the
                // `head` future above via `f.0.complete()` call.
                f.1.unwrap().wait(); // will immediately run
            }));
        }
    }
}

However as you can see this has become worse. The callback nesting is still there in the ffi function which i wanted to avoid. I was looking for a solution which does not block and looks like:

let fut_1 = f0();
let fut_2 = fut_1.and_then(|res_0| ... );

Although i have succeeded in not blocking (my design is entirely notification based consisting of only one event loop thread and everything executing there - i don't want many threads/thread-pool/blocking design and also don't want any Arc's and Mutex's) i have not succeeded in proper design as i would think ideal with futures.

Can somebody help me here to understand how futures can be used in such scenario ? I guess this can serve as a tutorial to anybody who wants similar single-threaded event-loop based solution.

I think i have figured it out - i went through futures and tokio both (and it was some learning curve) and in the end what i suppose is futures-rs is not meant to be used in such cases (it becomes too complicated). Tokio-core provides a very nice abstraction and has solved it much easily. All we need to do is create tokio_core::reactor::Core, obtain a Handle from it and using that create a tokio_core::channel::channel::<CoreMsg>(), distribute the Sender half to all other threads we need to funnel in (into main event loop) messages from, and finally run the Receiver half in the Core event loop (which is our main event loop).

So in nutshell something like this:

pub fn run_core_event_loop() {
    let cptr = Rc::new(RefCell::new(MyState::new()));

    let mut el = Core::new().unwrap();
    let el_h = el.handle();
    let (tx, rx) = channel::channel::<CoreMsg>().unwrap();

    // Now distribute `tx` and its clones to other threads - in my case as an opaque ptr handle to C (FFI) at one end
    // and to the response listener thread (which listens to responses from other crate) on the other end.

    // In my mind this becomes like boost::asio::io_service::work (if it helps someone)
    let el_work = rx.for_each(|core_msg| {
        if let Some(tail) = (core_msg.0)(&cptr) {
            el_h.spawn(tail);
        }
        Ok(())
    });

    let _ = el.run(el_work);
}

CoreMsg as explained earlier can be thought of as Box<FnOnce(&Rc<RefCell<MyState>>) -> Option<Box<Future<Item=(), Error=()>>> + Send + 'static

and a sample impl of MyState now looks like

impl MyState {
    fn f0(&mut self) -> Oneshot<Event> {
        let (tx, rx) = futures::oneshot();
        let id: u32 = unique_id();
        self.queue.insert(id, tx);

        other_crate.async_call_0(id);

        rx
    }
}

And the rest is now very easy to guess.

I hope i've understood and done it correctly.

I do have a few questions though (if this is at all the way to solve the problem):

  1. It's a bit inconvenient to have tokio_core::reactor::Core as non-sendable because channel::Sender is Send and meant to distribute to other threads. So in some cases i have to use a throw away mpsc::channel to transmit channel::Sender outside - this seems a little strange.
  2. I could not (yet) find a graceful way to terminate the event loop - i need to return an Err from inside rx.for_each above to do it - maybe there should be other mechanism - say returning Ok(None) terminates the loop and hence the core event loop ultimately.
  3. Writing the workaround CoreMsg is tedious. Wish Box<FnOnce> was usable - i need it in many places in my async code. Any idea when would this be, or is it not a high priority item ?