Sending futures through a channel

Hey All,

I have a running asyc task with a single instance of a Thing and a channel receiver. Now I want to send it async functions/closures to run against the Thing, so the closures should take a mutable reference to the thing. (Think actor).

I can't seem to figure out how to get these things across the channel. I've tried variations of this:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=7e4c96e71aa47683d814a0be8db244bf

Thank!

You are not returning a boxed future. You need to box it.

fn main() {
    let (tx, rx) = channel::unbounded::<FuncType<Thing>>();

    tx.send(
        Box::new(
            |_thing: &mut Thing| Box::pin(async move {
                println!("Hello world");
            })
        )
    ).unwrap();
}

Note that you should generally avoid crossbeam channels in async code as they are blocking. There's also a blog post about actors on the same blog.

1 Like

Ahhh... That's where you put the pin!

Thanks, @alice. I will read your blog post more carefully within the next few days. I have a lot of C++ code that uses a really simple library I made which models Elixir-style call() and cast() wrappers. I'd like to port some of it to Rust, but I haven't seen anything that comes close to it yet. Your blog is as close as I've seen.

Hi @alice. I read your blog on actors, and it is very close to what I'm trying to do. I wish I had read it a year ago; would have saved me a lot of time!
https://ryhl.io/blog/actors-with-tokio/

There are a number of fairly minor differences in what I'm trying to do, but one of the bigger differences is that instead of using a distinct message enumeration, ActorMessage, and a monolithic handler in the actor, like handle_message(), I wanted the client calls (in what you call the Handler object) to just post closures to the actor task to process.

So my eventual hope would be to have something that looks like:

impl MyActorHandle {
    ...

    pub async fn get_unique_id(&self) -> u32 {
        self.actor.call(|actor: &mut MyActor| async move {
            actor.next_id += 1
        })
    }
}

Here, call() would be generic around the block's return value, make the oneshot for that value type, send the async block to the actor to process, and await the result.

I'm not quite there yet...

Ah, so something like this?

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2a41c86c7f2857bd5749945fe665bcd6

Yes!!!!!

Hahaha. OMG, I've been trying to figure this out for four months now, and you just wrote it in under 15min!

What is the for<'a> around the lifetime? That's new to me.

It means "for all lifetimes", in the sense that writing this is a way to say that the thing in the box must implement all of the traits in the following infinite list:

  • FnOnce(&'lifetime1 mut MyActor) -> BoxFuture<'lifetime1, ()>
  • FnOnce(&'lifetime2 mut MyActor) -> BoxFuture<'lifetime2, ()>
  • FnOnce(&'lifetime3 mut MyActor) -> BoxFuture<'lifetime3, ()>
  • FnOnce(&'lifetime4 mut MyActor) -> BoxFuture<'lifetime4, ()>
  • FnOnce(&'lifetime5 mut MyActor) -> BoxFuture<'lifetime5, ()>
  • FnOnce(&'lifetime6 mut MyActor) -> BoxFuture<'lifetime6, ()>
  • and so on for every possible lifetime

Thanks so much. I definitely owe you one!

You're welcome.

Just to follow up...

I used this to get the core of my library (called "cooper") to compile, and published it. I'm a little concerned about the number of allocations that are required for each call (3-4?), but I'll worry about optimizing a little later, if necessary.

It's crazy how little code it took to get it working (~75lines).

https://crates.io/crates/cooper

To use the library, it's fairly simple. The implementation of a counting integer "unique ID" actor looks like this:

/// An actor that can create unique integer values from a counting integer.
#[derive(Clone)]
pub struct UniqueId {
    actor: Actor<u32>,
}

impl UniqueId {
    /// Create a new UniqueId actor
    pub fn new() -> Self {
        Self { actor: Actor::new() }
    }

    /// Gets a unique ID as the next integer value in the sequence.
    pub async fn get_unique_id(&self) -> u32 {
        self.actor.call(|state| Box::pin(async move {
            *state += 1;
            *state
        })).await
    }
}