Actix actor communication (Future combinators)


#1

I"m trying to work a few sync actors together but having no end of heartache. I’m hoping someone can point me in the right direction because I feel lost. I’ve included a minimal example but in my full example I’m trying to work together a reqwest call and a few diesel calls and the return the stored result to the user in an actix_web app. I can include the full code for that if it’ll help, but this was my minimal reproduction case:


#2

Not sure why the snippit link isn’t working, so here’s the code inline:

extern crate actix;
extern crate futures;

use actix::prelude::*;
use futures::Future;

#[derive(Debug)]
pub enum IncrError {
    Boom,
}

pub struct SyncIncr {
    pub counter: i32,
}

impl Actor for SyncIncr {
    type Context = SyncContext<Self>;
}

pub struct Add {
    pub input: i32,
}

impl Message for Add {
    type Result = Result<i32, IncrError>;
}

impl Handler<Add> for SyncIncr {
    type Result = Result<i32, IncrError>;

    fn handle(&mut self, msg: Add, _: &mut Self::Context) -> Self::Result {
        self.counter = self.counter + msg.input;
        Ok(self.counter.clone())
    }
}

#[derive(Debug)]
pub enum DecrError {
    Ugh,
}

pub struct SyncDecr {
    pub counter: i32,
}

impl Actor for SyncDecr {
    type Context = SyncContext<Self>;
}

pub struct Sub {
    pub input: i32,
}

impl Message for Sub {
    type Result = Result<i32, DecrError>;
}

impl Handler<Sub> for SyncDecr {
    type Result = Result<i32, DecrError>;

    fn handle(&mut self, msg: Sub, _: &mut Self::Context) -> Self::Result {
        self.counter = self.counter - msg.input;
        Ok(self.counter.clone())
    }
}

#[derive(Debug)]
pub enum SumError {
    Decr(DecrError),
    Incr(IncrError),
    Sum(String),
}

impl std::convert::From<DecrError> for SumError {
    fn from(e: DecrError) -> Self {
        SumError::Decr(e)
    }
}

impl std::convert::From<IncrError> for SumError {
    fn from(e: IncrError) -> Self {
        SumError::Incr(e)
    }
}

pub struct SyncSum {}

impl Actor for SyncSum {
    type Context = SyncContext<Self>;
}

pub struct Sum {
    pub a: i32,
    pub b: i32,
}

impl Message for Sum {
    type Result = Result<i32, SumError>;
}

impl Handler<Sum> for SyncSum {
    type Result = Result<i32, SumError>;

    fn handle(&mut self, msg: Sum, _: &mut Self::Context) -> Self::Result {
        Ok(msg.a + msg.b)
    }
}

fn main() {
    /*
      let addr = SyncArbiter::start(3, || {
        DbExecutor(SqliteConnection::establish("test.db").unwrap())
    });
    */
    let system = actix::System::new("actix-comms-test");

    let incr_addr = SyncArbiter::start(1, || SyncIncr { counter: 41 });
    let decr_addr = SyncArbiter::start(1, || SyncDecr { counter: 100 });
    let sum_addr = SyncArbiter::start(1, || SyncSum {});

    println!("Hello, world!");

    let f_add = incr_addr.send(Add { input: 1 });
    let f_sub = decr_addr.send(Sub { input: 100 });

    f_add
        .join(f_sub)
        .and_then(|res| match res {
            (Err(e), _) => actix::fut::err(SumError::from(e)),
            (_, Err(e)) => actix::fut::err(SumError::from(e)),
            (Ok(a), Ok(b)) => sum_addr.send(Sum { a, b }),
        }).wait(|res| match res {
            Err(e) => println!("ERROR: {}", e),
            Ok(sum) => println!("SUM: {}", sum),
        });

    let _ = system.run();
}

#3

So, I’ve coalesced the IncrError and DecrError to get the case down to one error

    Checking actor-frustration v0.1.0 (/Users/nsculli/actix/actor-frustration)
error[E0308]: match arms have incompatible types
   --> src/main.rs:128:25
    |
128 |           .and_then(|res| match res {
    |  _________________________^
129 | |             (Err(e), _) => actix::fut::err(SumError::from(e)),
130 | |             (_, Err(e)) => actix::fut::err(SumError::from(e)),
131 | |             (Ok(a), Ok(b)) => sum_addr.send(Sum { a, b }),
    | |                               --------------------------- match arm with an incompatible type
132 | |         }).wait(|res| match res {
    | |_________^ expected struct `actix::fut::FutureResult`, found struct `actix::prelude::Request`
    |
    = note: expected type `actix::fut::FutureResult<_, SumError, _>`

#4

How can I convert the actix::fut::FutureResult into an actix::prelude::Request or the converse?


#5

I’ve worked this example into an actix_web server to show where I’m trying to take the code. Still stuck trying to match the expected output type from the and_then after the join

Here is the code in a repo: https://gitlab.com/scull7/actix-actor-frustration

Receiving this error:


error[E0308]: match arms have incompatible types
   --> src/main.rs:130:25
    |
130 |           .and_then(|res| match res {
    |  _________________________^
131 | |             (Err(e), _) => actix::fut::err(SumError::from(e)),
132 | |             (_, Err(e)) => actix::fut::err(SumError::from(e)),
133 | |             (Ok(a), Ok(b)) => state.sum.send(Sum { a, b }),
    | |                               ---------------------------- match arm with an incompatible type
134 | |         })
    | |_________^ expected struct `actix::fut::FutureResult`, found struct `actix::prelude::Request`
    |
    = note: expected type `actix::fut::FutureResult<_, SumError, _>`
               found type `actix::prelude::Request<SyncSum, Sum>`

From the following code:

extern crate actix;
extern crate actix_web;
extern crate futures;
extern crate env_logger;

use actix::prelude::*;
use actix_web::{ AsyncResponder, FutureResponse, HttpResponse, Path, State };
use futures::Future;

#[derive(Debug)]
pub enum IncrError {
    Boom,
}

pub struct SyncIncr {
    pub counter: i32,
}

impl Actor for SyncIncr {
    type Context = SyncContext<Self>;
}

pub struct Add {
    pub input: i32,
}

impl Message for Add {
    type Result = Result<i32, IncrError>;
}

impl Handler<Add> for SyncIncr {
    type Result = Result<i32, IncrError>;

    fn handle(&mut self, msg: Add, _: &mut Self::Context) -> Self::Result {
        self.counter = self.counter + msg.input;
        Ok(self.counter.clone())
    }
}

#[derive(Debug)]
pub enum DecrError {
    Ugh,
}

pub struct SyncDecr {
    pub counter: i32,
}

impl Actor for SyncDecr {
    type Context = SyncContext<Self>;
}

pub struct Sub {
    pub input: i32,
}

impl Message for Sub {
    type Result = Result<i32, DecrError>;
}

impl Handler<Sub> for SyncDecr {
    type Result = Result<i32, DecrError>;

    fn handle(&mut self, msg: Sub, _: &mut Self::Context) -> Self::Result {
        self.counter = self.counter - msg.input;
        Ok(self.counter.clone())
    }
}

#[derive(Debug)]
pub enum SumError {
    Decr(DecrError),
    Incr(IncrError),
    Sum(String),
}

impl std::convert::From<DecrError> for SumError {
    fn from(e: DecrError) -> Self {
        SumError::Decr(e)
    }
}

impl std::convert::From<IncrError> for SumError {
    fn from(e: IncrError) -> Self {
        SumError::Incr(e)
    }
}

pub struct SyncSum {}

impl Actor for SyncSum {
    type Context = SyncContext<Self>;
}

pub struct Sum {
    pub a: i32,
    pub b: i32,
}

impl Message for Sum {
    type Result = Result<i32, SumError>;
}

impl Handler<Sum> for SyncSum {
    type Result = Result<i32, SumError>;

    fn handle(&mut self, msg: Sum, _: &mut Self::Context) -> Self::Result {
        Ok(msg.a + msg.b)
    }
}

struct AppState {
    decr: Addr<SyncDecr>,
    incr: Addr<SyncIncr>,
    sum: Addr<SyncSum>,
}

fn index(
    (params, state): (Path<(i32, i32)>, State<AppState>),
) -> FutureResponse<HttpResponse> {

    let add = params.0;
    let sub = params.1;

    let f_add = state.incr.send(Add { input: add });
    let f_sub = state.decr.send(Sub { input: sub });

    f_add
        .join(f_sub)
        .and_then(|res| match res {
            (Err(e), _) => actix::fut::err(SumError::from(e)),
            (_, Err(e)) => actix::fut::err(SumError::from(e)),
            (Ok(a), Ok(b)) => state.sum.send(Sum { a, b }),
        })
        from_err()
        .and_then(|res| match res {
            Ok(sum) => Ok(actix_web::HttpResponse::Ok().json(sum)),
            Err(_) => {
                println!("ERROR: {}", e);
                Ok(actix_web::HttpResponse::InternalServerError().into()),
            },
        }).responder()
        
}

fn main() {
    std::env::set_var("RUST_LOG", "actix_web=info");
    env_logger::init();
    let system = actix::System::new("actix-comms-test");

    actix_web::server::new(move ||
        actix_web::App::with_state(AppState {
            decr: SyncArbiter::start(1, || SyncDecr { counter: 100 }),
            incr: SyncArbiter::start(1, || SyncIncr { counter: 41 }),
            sum: SyncArbiter::start(1, || SyncSum {}),
        })
        .middleware(actix_web::middleware::Logger::default())
        .resource("/{add}/{sub}", |r| r.method(actix_web::http::Method::GET)
                  .with(index))
    ).bind("127.0.0.1:8088")
        .unwrap()
        .start();

    println!("Started http server: 127.0.0.1:8088");

    let _ = system.run();
}

#6

Usually, when you have this situation (wanting to return 1 of 2 different types) you use https://docs.rs/futures/0.1.25/futures/future/enum.Either.html. This requires the 2 types to yield the same Item and Error types, but can otherwise be completely different types themselves.


#7

I don’t want to return 2 different types. I want to return a single type. I think the type mismatch is coming from my use of actix::fut::err, perhaps there’s another way to generate an error of the proper type. I want to return something like FutureResult<Result<i32, sumError>, E>>. Is Either the proper way of getting there?

Here’s the latest error I’m receiving:

    |
127 |           .and_then(|res| match res {
    |  _________________________^
128 | |             (Err(e), _) => actix::fut::err::<i32, _, SyncSum>(SumError::from(e)),
129 | |             (_, Err(e)) => actix::fut::err::<i32, _, SyncSum>(SumError::from(e)),
130 | |             (Ok(a), Ok(b)) => state.sum.send(Sum { a, b }).from_err(),
    | |                               --------------------------------------- match arm with an incompatible type
131 | |         }).and_then(|res| match res {
    | |_________^ expected struct `actix::fut::FutureResult`, found struct `futures::future::FromErr`
    |
    = note: expected type `actix::fut::FutureResult<i32, SumError, SyncSum>`
               found type `futures::future::FromErr<actix::prelude::Request<SyncSum, Sum>, _>`

#8

Well, you have 2 different types, whether you wanted to or not :slight_smile:

actix::fut::err is just an export of the same function in futures::future module - it returns a FutureResult, which is a type that implements Future.

state.sum.send(...) returns actix::prelude::Request, which is the 2nd type and also implements Future. You can’t return FutureResult<...> for both because this type is used when you already have the value (or error) to return, but you don’t here - you’ve only submitted some work for doing the sending, and actix gave you a future representing its completion.

Either is used in precisely cases like this. You have 2 futures, each represented by different types, but if they yield the same type of item and error with the same type, you can “unify” them via Either.

You will likely have to adapt the error type of Request to be SumError and may need to adapt the return value as well, depending on what value you want the following combinatory to take.


#9

There has to be a better way… :cry: But this does compile and work…

fn index((params, state): (Path<(i32, i32)>, State<AppState>)) -> FutureResponse<HttpResponse> {
    let add = params.0;
    let sub = params.1;

    let f_add = state.incr.send(Add { input: add });
    let f_sub = state.decr.send(Sub { input: sub });

    f_add
        .join(f_sub)
        .map_err(SumError::from)
        .and_then(|res| match res {
            (Err(e), _) => futures::future::err(SumError::from(e)),
            (_, Err(e)) => futures::future::err(SumError::from(e)),
            (Ok(a), Ok(b)) => futures::future::ok((a, b)),
        }).and_then(move |(a, b)| state.sum.send(Sum { a, b }).map_err(SumError::from))
        .then(|res| match res {
            Ok(Ok(sum)) => Ok(actix_web::HttpResponse::Ok().json(sum)),
            Ok(Err(e)) => {
                println!("ERROR: {:?}", e);
                Ok(actix_web::HttpResponse::InternalServerError().finish())
            }
            Err(e) => {
                println!("ERROR: {:?}", e);
                Ok(actix_web::HttpResponse::InternalServerError().finish())
            }
        }).responder()
}

@vitalyd Thank you for pointing out the error type difference.


#10

What would be the best approach to flatten the code?


#11

flatten() and proper error coalescing are your friends. :raised_hands:

The code is no longer a trail of tears:

fn index((params, state): (Path<(i32, i32)>, State<AppState>)) -> FutureResponse<HttpResponse> {
    let add = params.0;
    let sub = params.1;

    state
        .incr
        .send(Add { input: add })
        .join(state.decr.send(Sub { input: sub }))
        .flatten()
        .and_then(move |(a, b)| state.sum.send(Sum { a, b }).flatten())
        .map(|sum| actix_web::HttpResponse::Ok().json(sum))
        .map_err(actix_web::Error::from)
        .responder()
}

This required me to add a consistent error struct that is used as the return from all of the actors and implements the actix_web::error::From trait. But the extra effort to unify the error structure makes the actor communication so much more ergonomic. (IMHO, it takes it from unusable to usable).

Heres’s the error code:

#[derive(Debug)]
pub enum SystemError {
    Decr(DecrError),
    Incr(IncrError),
    Mailbox(actix::MailboxError),
    Sum(SumError),
}

impl std::convert::From<actix::MailboxError> for SystemError {
    fn from(e: actix::MailboxError) -> Self {
        SystemError::Mailbox(e)
    }
}

impl std::convert::From<SystemError> for actix_web::Error {
    fn from(e: SystemError) -> Self {
        match e {
            SystemError::Mailbox(e) => {
                println!("MAIL BOX ERROR: {:?}", e);
                actix_web::error::ErrorInternalServerError(e)
            }
            SystemError::Decr(e) => {
                println!("DECR ERROR: {:?}", e);
                actix_web::error::ErrorBadRequest(e)
            }
            SystemError::Incr(e) => {
                println!("INCR ERROR: {:?}", e);
                actix_web::error::ErrorBadRequest(e)
            }
            SystemError::Sum(e) => {
                println!("SUM ERROR: {:?}", e);
                actix_web::error::ErrorBadRequest(e)
            }
        }
    }
}

The full code is available here: https://gitlab.com/scull7/actix-actor-frustration/blob/master/src/main.rs


#12

Hi,
Maybe you can help me to understand a little thing as you struggled with what I’m currently struggling with:

In a chain of futures, all and_then must return the same Error type ? Otherwise we can convert errors using .map_err() But then our error types must implement From or Into other error types ?


#13

Precisely, you must have a single error type that flows through the entire chain. At least for any errors which you would like to bubble up to the top.


#14

Ack, Thanks!