Hey guys, i am trying to integrate futures into a design and am struggling a little. So the existing architecture contains an event queue. These are the simple steps that take place. C front-end calls FFI function -> request gets posted to event-loop -> request is handled and async request is made into another crate -> the result is processed by another crate (which could do networking etc) and is posted back to our event-loop -> call the callback given by C to return the result. So as it interacts with other rust crate in a particular way, i cannot change that. However in my crate i have callback based design which i want to covert to future based. So to demo the simple procedure outlined above here is some code:
// FFI function - so one Sender to event loop is with the C code as opaque handle
// This will post the request to the event loop.
#[no_mangle]
pub unsafe extern "C" fn ffi_request(handle: *mut Sender<CoreMsg>, callback: unsafe extern "C" fn(i32)) {
(*handle).send(CoreMsg::new(move |cptr| {
let cptr_clone = cptr.clone();
cptr.borrow_mut().f0(move |res_0| {
cptr_clone.f1(res_0, move |res_1| {
let final_res: i32 = some_transformation(res_1);
callback(final_res);
})
})
}));
}
// The main core event loop
fn run_core_event_loop(core_rx: mpsc::Receiver<CoreMsg>) {
thread::spawn(move || {
let cptr = Rc::new(RefCell::new(Core::new()));
for msg in core_rx.iter() {
(msg.0)(&cptr);
}
}
}
// Core state
#[derive(Copy, Clone)]
enum Event {
Res0,
Res1,
}
struct Core {
queue: HashMap<u32, Event>,
}
impl Core {
fn new() -> Self { ... }
fn f0<F: FnMut(Event)>(&mut self, f: F) {
let id: u32 = unique_id();
self.queue.insert(id, Box::new(f));
other_crate.async_call_0(id);
}
fn f1<F: FnMut(Event)>(&mut self, val: Event, f: F) {
let id: u32 = unique_id();
self.queue.insert(id, Box::new(f));
other_crate.async_call_1(id, val);
}
}
// Receive async notification from other crate and post it to our loop
fn rx_from_other_crate(rx: mpsc::Receiver<(id, Event)>, core_tx: mpsc::Sender<CoreMsg>) {
thread::spawn(move || {
for it in rx.iter() {
// This will post the request to the event loop.
core_tx.send(CoreMsg::new(move |cptr| {
let f = cptr.borrow_mut().queue.remove(it.0).unwrap();
f(it.1); // ----------- MARK-1
}));
}
}
}
Though CoreMsg
is not important but just for sake of completion here is how it looks:
struct CoreMsg(Box<FnMut(&Rc<RefCell<Core>>) + Send + 'static)'
impl CoreMsg {
fn new<F: FnOnce(&Rc<RefCell<Core>>) + Send + 'static>(f: F) -> Self {
let f = Some(f);
CoreMsg(Box::new(move |cptr| {
let f = f.unwrap();
f(cptr);
}
}
}
All it does is makes up for the lack of Box<FnOnce>
right now in Rust.
So as you see it's a pretty simple design where both sides (FFI and other-crate response receiver) post their respective stuffs to the core event loop which does some processing. In complex code the callbacks end up nesting deeply and create inversion of control. I believe futures can help here tremendously. The trouble is i am not sure how to correctly do it. I tried using Complete<Event> <-> Oneshot<Event>
pair oneshot()
returning Oneshot<Event>
from Core::f0
and Core::f1
and changing Core::queue
to HashMap<u32, Complete<Event>>
. With that MARK-1
changes to f.complete(it.1)
(where f
is now a Complete
future insted of callback/function).
However as you will notice this design become difficult as Core::f0
will register a Complete<Event>
and return a Oneshot<Event>
(so far good) but Core::f1
will also do that with a separate pair of it.
Transforming the above to future the relavant changes look like:
// FFI function - so one Sender to event loop is with the C code as opaque handle
// This will post the request to the event loop.
#[no_mangle]
pub unsafe extern "C" fn ffi_request(handle: *mut Sender<CoreMsg>, callback: unsafe extern "C" fn(i32)) {
(*handle).send(CoreMsg::new(move |cptr| {
let cptr_clone = cptr.clone();
let (id, fut_0) = cptr.borrow_mut().f0();
let tail_0 = fut_0.map(|res_0| {
let (id, fut_1) = cptr_clone.borrow_mut().f1(res_0);
let tail_1 = fut_1.map(|res_1| {
let final_res: i32 = some_transformation(res_1);
callback(final_res);
}).map_err(|_| ());
cptr_clone.borrow_mut().queue.get_mut(id).unwrap().1 = Some(Box::new(tail_1));
}).map_err(|_| ());
cptr.borrow_mut().queue.get_mut(id).unwrap().1 = Some(Box::new(tail_0));
}));
}
struct Core {
queue: HashMap<u32, (Complete<Event>, Option<Box<Future<Item=(), Error=()>>>),
}
impl Core {
fn f0(&mut self) -> (u32, Oneshot<Event>) {
let (tx, rx) = futures::oneshot();
let id: u32 = unique_id();
self.queue.insert(id, (tx, None));
other_crate.async_call_0(id);
(id, rx)
}
fn f1(&mut self, val: Event) -> (u32, Oneshot<Event>) {
let (tx, rx) = futures::oneshot();
let id: u32 = unique_id();
self.queue.insert(id, (tx, None));
other_crate.async_call_1(id, val);
(id, rx)
}
}
// Receive async notification from other crate and post it to our loop
fn rx_from_other_crate(rx: mpsc::Receiver<(id, Event)>, core_tx: mpsc::Sender<CoreMsg>) {
thread::spawn(move || {
for it in rx.iter() {
// This will post the request to the event loop.
core_tx.send(CoreMsg::new(move |cptr| {
let f = cptr.borrow_mut().queue.remove(it.0).unwrap();
f.0.complete(it.1);
// Although this looks like a blocking wait it won't because we just completed the
// `head` future above via `f.0.complete()` call.
f.1.unwrap().wait(); // will immediately run
}));
}
}
}
However as you can see this has become worse. The callback nesting is still there in the ffi function which i wanted to avoid. I was looking for a solution which does not block and looks like:
let fut_1 = f0();
let fut_2 = fut_1.and_then(|res_0| ... );
Although i have succeeded in not blocking (my design is entirely notification based consisting of only one event loop thread and everything executing there - i don't want many threads/thread-pool/blocking design and also don't want any Arc's
and Mutex's
) i have not succeeded in proper design as i would think ideal with futures.
Can somebody help me here to understand how futures can be used in such scenario ? I guess this can serve as a tutorial to anybody who wants similar single-threaded event-loop based solution.