Implementing the CompletableFuture abstraction on top of futures::sync::oneshot

#1

I would like to have something like Java 8 CompletableFuture, something that could be passed around between threads, so that one thread could wait for another to complete a task with a result.

I found a crate providing such a functionality (completable_future) that underneath uses a Mutex. But instead I decided to try using a futures::sync::oneshot channel, which is probably much heavier, but the implementation would be much easier to understand, because the goal of oneshot is basically the same as of the CompletableFuture. I also wanted to use it as an opportunity to learn Rust (I am a beginner).

But then I entered the ownership hell, which led me to questioning if Sender and Receiver can be passed around together at all. And also questioning the ability to build the CompletableFuture abstraction.

So back to requirements. I would like to have is interface which would allow me doing thing like this:

use std::thread;
mod completable_future;
use completable_future::CompletableFuture;
use futures::future::Future;
use std::sync::Arc;

fn main() {
	let c1 = Arc::new(CompletableFuture::<u64>::new());
	let c2 = Arc::new(CompletableFuture::<bool>::new());

	let cloned_c1 = c1.clone();
	let cloned_c2 = c2.clone();
	let child = thread::spawn(move || {

		let c1 = cloned_c1;
		let c2 = cloned_c2;
		let result = 5; // in real life -> let result = some_lengthy_calculations_part1();
		c1.complete(result);
		let result = false; // in real life -> let result = some_long_calculations_part2();
		c2.complete(result);

	});

	let result_future1 = c1.into_future()
		.and_then(|x| {
			println!("Doing something with partial result1: {}", x);
			Ok(())
		});

	let result_future2 = c2.into_future()
		.and_then(|x| {
			println!("Doing something with partial result2: {}", x);
			Ok(())
		});

	result_future1.wait();
	result_future2.wait();

	child.join();
}

The implementation I came up with is

use futures::sync::oneshot;
use futures::future::Future;

pub struct CompletableFuture<T> {
    receiver : oneshot::Receiver<T>,
    sender : oneshot::Sender<T>,
}

impl<T> CompletableFuture<T> {
    pub fn new() -> CompletableFuture<T>
    {
        let (sender, receiver) = oneshot::channel::<T>();
        CompletableFuture {
            sender,
            receiver
        }
    }

    pub fn into_future(&self) -> impl Future<Item =T, Error = oneshot::Canceled> {
        self.receiver
    }

    pub fn complete(&self, value: T) -> Result<(), T> {
        self.sender.send(value)
    }
}

Currently this gives (playground):

Compiling playground v0.0.1 (/playground)
error[E0507]: cannot move out of borrowed content
  --> src/main.rs:22:9
   |
22 |         self.receiver
   |         ^^^^^^^^^^^^^ cannot move out of borrowed content

error[E0507]: cannot move out of borrowed content
  --> src/main.rs:26:9
   |
26 |         self.sender.send(value)
   |         ^^^^^^^^^^^ cannot move out of borrowed content

Would you have any suggestion on how to build the abstraction?

#2

Have you tried defining into_future(self) instead of into_future(&self)?

#3

I tried, but this leads to other problems.

24 |     let result_future1 = c1.into_future()
   |                          ^^ cannot move out of an `Arc`

But good news is that I just discovered futures::oneshot “a new in-memory oneshot used to represent completing a computation”, and I think this is exactly what I should use instead. I will give it a try tomorrow.

#4

So, I believe the reason why there’s a separate “Receiver” and “Sender” is that you are meant to give the “Sender” half to the sending thread and “Receiver” half to the receiving thread. I’m not sure it makes sense to keep both together in one struct, let alone inside an Arc.

#5

You are probably right. Apparently this is the most Rusty way to do this. Although I would expect that compiler would allow moving receiver and sender to two different threads in the code example which I wrote.

Eventually I decided to use futures::sync::oneshot directly, because it turned out that futures::oneshot in the latest release (0.1.25) is just an re-export of futures::sync::oneshot, and it will be deprecated soon (above 0.1.4).

But it works, and conceptually it is equivalent to CompletableFuture (except the ability for waiting to time out, but that’s another story). Exactly what I needed.

closed #6

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.