[actix] How to use ctx.wait with SyncContext?

When the code below is executed with the asynchronous context everything works fine. However, when switching to the synchronized context, the ctx.wait method stops working.

use actix::prelude::*;
use tokio::time::{sleep, Duration};

#[derive(Message)]
#[rtype(result = "()")]
struct Sum(usize, usize);

struct Calculator;

impl Actor for Calculator {
    type Context = SyncContext<Self>;
    // type Context = Context<Self>;
}

impl Handler<Sum> for Calculator {
    type Result = ();

    fn handle(&mut self, msg: Sum, ctx: &mut Self::Context) -> Self::Result {
        let fut = stop(msg.0, msg.1);
        let actor_future = fut.into_actor(self);
        ctx.wait(actor_future);
    }
}

async fn stop(a: usize, b: usize) {
    sleep(Duration::from_millis(3000)).await;
    println!("{}", a + b);
}

#[actix::main]
async fn main() {
    let addr = SyncArbiter::start(3, || Calculator);
    // let addr = Calculator.start();
    let _res = addr.do_send(Sum(10, 5));

    sleep(Duration::from_millis(4000)).await;
}

I receive the following error message

error[E0599]: the method `wait` exists for mutable reference `&mut SyncContext<Calculator>`, but its trait bounds were not satisfied
   --> src/main.rs:21:13
    |
21  |         ctx.wait(actor_future);
    |             ^^^^ method cannot be called on `&mut SyncContext<Calculator>` due to unsatisfied trait bounds
    |
   ::: /home/lucas/.cargo/registry/src/index.crates.io-6f17d22bba15001f/actix-0.13.1/src/sync.rs:237:1
    |
237 | pub struct SyncContext<A>
    | -------------------------
    | |
    | doesn't satisfy `<_ as ActorFuture<_>>::Output = ()`
    | doesn't satisfy `_: ContextFutureSpawner<_>`
    | doesn't satisfy `actix::SyncContext<Calculator>: ActorFuture<_>`
    |
    = note: the following trait bounds were not satisfied:
            `<&mut actix::SyncContext<Calculator> as ActorFuture<_>>::Output = ()`
            which is required by `&mut actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `&mut actix::SyncContext<Calculator>: ActorFuture<_>`
            which is required by `&mut actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `<&&mut actix::SyncContext<Calculator> as ActorFuture<_>>::Output = ()`
            which is required by `&&mut actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `&&mut actix::SyncContext<Calculator>: ActorFuture<_>`
            which is required by `&&mut actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `<&mut &mut actix::SyncContext<Calculator> as ActorFuture<_>>::Output = ()`
            which is required by `&mut &mut actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `&mut &mut actix::SyncContext<Calculator>: ActorFuture<_>`
            which is required by `&mut &mut actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `<actix::SyncContext<Calculator> as ActorFuture<_>>::Output = ()`
            which is required by `actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `actix::SyncContext<Calculator>: ActorFuture<_>`
            which is required by `actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `<&actix::SyncContext<Calculator> as ActorFuture<_>>::Output = ()`
            which is required by `&actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>`
            `&actix::SyncContext<Calculator>: ActorFuture<_>`
            which is required by `&actix::SyncContext<Calculator>: actix::ContextFutureSpawner<_>

May I ask what you are trying to accomplish? As the name suggests, the sync arbiter is not meant to handle async tasks/futures. Rather, it is meant for CPU-bound tasks that need to block a thread for some time (which would be a bad thing to do on the async arbiter, which needs to handle the event loop and multiple actors, all in the same thread).

In my real project there is an actor that is responsible for processing data (Operator actor). However, this actor needs to request the data to be processed to the Buffer actor with add.send().await. Therefore, despite the Operator's main responsibility being synchronous, he needs to carry out an asynchronous operation.

Hmm, I see. I guess you have to block the actor then (which shouldn't be a problem as each actor in your SyncArbiter has its own thread, leaving the other instances to receive more tasks) while waiting for the message to complete. I'd use futures::executor::block_on for this, i.e. let res = futures::executor::block_on(add.send());

This approach didn't work.

    fn handle(&mut self, msg: Sum, ctx: &mut Self::Context) -> Self::Result {
        let fut = stop(msg.0, msg.1);
        let _res = futures::executor::block_on(fut);
        // let actor_future = fut.into_actor(self);
        // ctx.wait(actor_future);
    }

I am getting the following error.

thread '<unnamed>' panicked at src/main.rs:25:5:
there is no reactor running, must be called from the context of a Tokio 1.x runtime
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

@jofas Is it possible to create an ASYNC arbiter that only runs this actor in a separate thread? This way, I could keep the logic asynchronous normally, since the CPU bound function would not interfere with the actors that would be executed in the other arbiter.

Yeah, in your example you call tokio::time::sleep which needs to run in a tokio runtime. You could replace the future runtime with a tokio runtime running on the current thread, but this seems like so much overhead to me. Can you change your actors in such a way that your CPU bound actor receives all the data in a single message, rather than it needing to message some other actor? I think changing your logic would be the easiest solution to be honest. You could split your operator actor into the part that receives the request, gets the data from the buffer actor and forwards both to the sync part which only does the computation.

1 Like