[actix] Async method inside Actor::started()

I'm having trouble calling an asynchronous method within the Actor::started() function that changes the actor's attributes.

impl Actor for Manager {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.register(ctx.address().recipient());
        self.subscribe_system_async::<ClusterLog>(ctx);
        let fut = Box::pin(async {
            sleep(Duration::from_secs(10)).await;
            self.config();
            self.ranking_logic().await;
        });
        let future_actor = fut.into_actor(self);
        ctx.wait(future_actor);
    }
}
error[E0501]: cannot borrow `*self` as immutable because previous closure requires unique access
   --> src/actors/manager.rs:218:43
    |
213 |           let fut = Box::pin(async {
    |  ____________________________-
214 | |             sleep(Duration::from_secs(10)).await;
215 | |             self.config();
    | |             ---- first borrow occurs due to use of `*self` in generator
216 | |             self.ranking_logic().await;
217 | |         });
    | |_________- generator construction occurs here
218 |           let future_actor = fut.into_actor(self);
    |                                             ^^^^ second borrow occurs here
219 |           ctx.wait(future_actor);
    |           ---------------------- argument requires that `*self` is borrowed for `'static`

error[E0521]: borrowed data escapes outside of method
   --> src/actors/manager.rs:219:9
    |
210 |     fn started(&mut self, ctx: &mut Self::Context) {
    |                ---------
    |                |
    |                `self` is a reference that is only valid in the method body
    |                let's call the lifetime of this reference `'1`
...
219 |         ctx.wait(future_actor);
    |         ^^^^^^^^^^^^^^^^^^^^^^
    |         |
    |         `self` escapes the method body here
    |         argument requires that `'1` must outlive `'static`

Have you tried using actix::fut::wrap_future instead of into_actor here?

The same problem happens.

That's not entirely correct, because we in fact get rid of your first error: rustexplorer. As to your second error, I don't see how—given the signature of Actor::started—we are able to span a future into the context that captures &mut Self, as Context requires ActorFutures to be 'static. Can you make your ranking_logic synchronous somehow?

Sorry about that. I don't know about making the ranking_logic() method synchronous. I put a while sleep inside until messages from all workers in separated nodes are received.

    async fn ranking_logic(&mut self) {
        self.collect_metrics();

        // Workaround: Wait until all workers respond with *Metrics* Message
        // Actix-Telepathy not suport .send().await()
        // Coerce supports but is impossible to understend. No documentation
        while self.received_workers.values().any(|&score| score.is_none())
        {
            sleep(Duration::from_millis(100)).await;
        }

        let highest_operation_score = self
            .rank
            .iter()
            .max_by_key(|entry| entry.1)
            .expect("Failed to get the highest_score ");

        let lowest_operatin_score = self
            .rank
            .iter()
            .min_by_key(|entry| entry.1)
            .expect("Failed to get the lowest_score ");

        let pop_lowest_operation_node = self
            .operation
            .get_mut(&lowest_operatin_score.0)
            .expect("Lowest Node not find")
            .pop_back()
            .expect("Can not find the lowers node");

        // Add one worker to the operation with the highest score
        self.operation
            .get_mut(&highest_operation_score.0)
            .expect("Lowest Node not find")
            .push_back(pop_lowest_operation_node.clone());

        // Update the Worker configuration
        self.workers
            .get_mut(&pop_lowest_operation_node.worker_remote_addr) // WorkerAtribute
            .expect("Worker not found on list of workers")
            [pop_lowest_operation_node.index] =
            highest_operation_score.0.clone();

        self.configure_cluster();

        // Restore to default
        self.rank = self.default_rank.clone();
        self.received_workers = self.default_received_workers.clone();
    }

Hmm, maybe it would be better (more idiomatic within the context of the actix framework) to just trigger collect_metrics during startup (which I assume somehow pings your workers which respond with some message containing metrics?) and then receive the messages from your workers while your Manager is already up and running. When all messages are received you can proceed with updating your ranking. When someone needs the ranking while not every worker has responded, you could return a message that informs the caller that the ranking isn't ready yet.

1 Like

It worked gracefully. thank you very much sir @jofas. I removed the while with sleep and placed the condition inside the handler that received messages from the Worker.

fn started(&mut self, ctx: &mut Self::Context) {
        self.register(ctx.address().recipient());
        self.subscribe_system_async::<ClusterLog>(ctx);
        ctx.run_later(Duration::from_secs(10), |act, _ctx| {
            act.collect_metrics()
        });
    }
        if self.received_workers.values().all(|&score| score.is_some()) {
            ctx.notify(Rebalance);
            self.config();
        }
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.