Actix websocket proxy messages loss

I'm attempting to implement a sort of websocket proxy with actix-web. As I cannot find a full example to start, I just copy-paste from the actix examples about webscokets.

Here is my code so far:

pub struct WsProxy {
    sintql: Addr<SintQl>,
    rcv: Receiver<JsonRpc>,
    user: String,
}

impl Actor for WsProxy {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.listen_for_response(ctx);
    }
}

impl WsProxy {
    fn listen_for_response(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
        ctx.run_interval(Duration::from_millis(50), |act, c| {
            match act.rcv.try_next() {
                Ok(Some(response)) => {
                    let response = response.0;
                    info!("send back response for user {} {:?}", act.user, response);
                    c.write_raw(response);
                }
                Ok(None) => {
                    debug!("receiver got a response None and stopping");
                    act.finished(c);
                }
                Err(e) => trace!("receiver got an error {:?}", e),
            }
        });
    }
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsProxy {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(msg) => {
                info!(
                    "received request from user {} and forward {:?}",
                    self.user, msg
                );

                let msg: JsonRpc = msg.into();

                ctx.spawn(
                    self.sintql
                        .send(msg)
                        .into_actor(self)
                        .map(|_, _, _| ()),
                );
            }
            Err(e) => {
                warn!(
                    "received an error from client {:?}\ngoing to stop connection",
                    e
                );
                self.finished(ctx);
            }
        }
    }
}

struct SintQl {
    out: SinkWrite<ws::Message, SplitSink<Framed<BoxedSocket, Codec>, ws::Message>>,
    snd: Sender<JsonRpc>,
    buf: Vec<Frame>,
}

impl Actor for SintQl {
    type Context = Context<Self>;
}

impl Handler<JsonRpc> for SintQl {
    type Result = ();

    fn handle(&mut self, msg: JsonRpc, _ctx: &mut Self::Context) {
        debug!("send a message to sintql {:?}", msg);
        if let Err(e) = self.out.write(msg.0) {
            error!("error send message to sintql {:?}", e);
        }
    }
}

impl StreamHandler<Result<Frame, WsProtocolError>> for SintQl {
    fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(msg) => {
                trace!("received from sintql {:?}", msg);

                // In case message is too long and is divided in parts, all the parts
                // have to be sent back in sequence, without any other message in the
                // middle.
                if let Frame::Continuation(actix_http::ws::Item::Last(_)) = msg {
                    self.buf.push(msg);
                    while !self.buf.is_empty() {
                        let mut snd = self.snd.clone();
                        let cmsg = self.buf.remove(0);
                        trace!("send continuation message");
                        match snd.try_send(cmsg.into()) {
                            Ok(_) => trace!("sent continutation"),
                            Err(e) => error!("sent continuation failed {:?}", e),
                        }
                    }
                } else if let Frame::Continuation(_) = msg {
                    self.buf.push(msg);
                } else {
                    let mut snd = self.snd.clone();
                    let fut = async move {
                        trace!("send sintql response to ws_proxy {:?}", msg);
                        snd.send(msg.into()).await
                    };

                    ctx.spawn(fut.into_actor(self).map(|_, _, _| ()));
                }
            }
            Err(e) => {
                error!("got an error from sintql {:?}", e);
                // XXX: Overflow error will loop forever
                self.finished(ctx);
            }
        };
    }
}

impl actix::io::WriteHandler<WsProtocolError> for SintQl {}

That code looks to work fine, but in some cases, when I have very near messages, I got some of them missed.

The bad part is that I don't get an error, just looking on the log all seems fine, but just missed the message.

If I try the test on the actual websocket server, I don't miss any of the messages, so I know is a bug on my proxy.

What I see on the log is the following when it worked:

[2020-04-21T10:19:24Z INFO  sint_api_gateway::ws_proxy] received request from user Unknown and forward Text("{\"jsonrpc\":\"2.0\",...\"id\":\"1587464364060834886\"}")
[2020-04-21T10:19:24Z DEBUG sint_api_gateway::ws_proxy] send a message to sintql JsonRpc(Text("{\"jsonrpc\":\"2.0\",...,\"id\":\"1587464364060834886\"}"))

The first line is the message received from a client browser, the second is the SintQl Actor that sends to the websocket server the message it received.

And when it does not work:

[2020-04-21T10:19:24Z INFO  sint_api_gateway::ws_proxy] received request from user Unknown and forward Text("{\"jsonrpc\":\"2.0\",...,\"id\":\"1587464364045767034\"}    ")
[2020-04-21T10:19:24Z INFO  sint_api_gateway::ws_proxy] received request from user Unknown and forward Text("{\"jsonrpc\":\"2.0\",...\"id\":\"158746436404574323\"}")
[2020-04-21T10:19:24Z DEBUG sint_api_gateway::ws_proxy] send a message to sintql JsonRpc(Text("{\"jsonrpc\":\"2.0\",...\"id\":\"1587464364045767034\"}"))

Here there is a message received in the middle, while it still have to send the first.
But the last line say it is going to send it and the code is:

        debug!("send a message to sintql {:?}", msg);
        if let Err(e) = self.out.write(msg.0) {
            error!("error send message to sintql {:?}", e);
        }

As I don't find any error, I suppese he wrote the message, but the websocket server, doesn't receive anything.

I hope I can have any hint on some mistake on my code here, as I can't find the reason for such a behaviour.

Thanks in advance for any help.

I think the issue can be in the listen_for_response() method, as it block.

    fn listen_for_response(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
        ctx.run_interval(Duration::from_millis(50), |act, c| {
            match act.rcv.try_next() {
                Ok(Some(response)) => {
                    let response = response.0;
                    info!("send back response for user {} {:?}", act.user, response);
                    c.write_raw(response);
                }
                Ok(None) => {
                    debug!("receiver got a response None and stopping");
                    act.finished(c);
                }
                Err(e) => trace!("receiver got an error {:?}", e),
            }
        });
    }

I would like to use async here, but unfortunately I can't find a working way.

Even spawn a thread is not easy here, as rcv and context cannot be cloned or moved.

The worst part is that I think I should have an error if it cannot write the message in the SinkWrite.
:frowning:

Which version of actix are you using?

Hi @alice here my versions: Actix 0.9, Actix-Web 2.0.0

Where is the WebsocketContext defined? I can't find it in either crate.

It is in actix_web_actors::ws module:

actix_web_actors version 2.0.0

If you want to use async inside the run_interval, could you not just spawn an async block?

I can't do that, because the Receiver (it is a futures::channel::mpsc::Receiver) cannot be cloned.

I would rather avoid the run_interval at all, adding a stream to handle the messages from the Receiver, but had the same issue.

So the only way I make it work is by the run_interval, that block, but I don't understand why this is missing messages on the SinkWriter. I would expect loss in performance, but that is not an issue as the proxy is for web app with requests from users.

For example, if I try to spawn the whole body of the closure:

     fn listen_for_response(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
         ctx.run_interval(
             Duration::from_millis(RECEIVER_CHANNEL_POLLING),
             |act, c| {
                 c.spawn(
                     async move {
                         match act.rcv.try_next() {
                             Ok(Some(response)) => {
                                 let response = response.0;
                                 info!("send back response for user {} {:?}", act.user, response);
                                 c.write_raw(response);
                             }
                             Ok(None) => {
                                 debug!("receiver got a response None and stopping");
                                 act.finished(c);
                             }
                             Err(e) => trace!("receiver got an error {:?}", e),
                         }
                     }.into_actor(act)
                     .map(|_, _, _| ())
                 );

                 ()
             },
         );
     }

But I get this:

   Compiling sint-api-gateway v0.1.0 (/usr/local/projects/SOC/SQM/SINT/sd-backend/sint-api-gateway)                           [42/1928]
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/ws_proxy.rs:66:32
   |
66 |                       async move {
   |  ________________________________^
67 | |                         match act.rcv.try_next() {
68 | |                             Ok(Some(response)) => {
69 | |                                 let response = response.0;
...  |
78 | |                         }
79 | |                     }.into_actor(act)
   | |_____________________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #2 defined on the body at 64:13...
  --> src/ws_proxy.rs:64:13
   |
64 | /             |act, c| {
65 | |                 c.spawn(
66 | |                     async move {
67 | |                         match act.rcv.try_next() {
...  |
83 | |                 ()
84 | |             },
   | |_____________^
   = note: ...so that the types are compatible:
           expected &mut ws_proxy::WsProxy
              found &mut ws_proxy::WsProxy
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `actix::fut::map::Map<actix::fut::FutureWrap<impl core::future::future::Future, ws_proxy::WsProxy>, [closure@
src/ws_proxy.rs:80:26: 80:38]>` will meet its required lifetime bounds
  --> src/ws_proxy.rs:65:19
   |
65 |                 c.spawn(
   |                   ^^^^^

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/ws_proxy.rs:66:32                                                                                                    [5/1928]
   |
66 |                       async move {
   |  ________________________________^
67 | |                         match act.rcv.try_next() {
68 | |                             Ok(Some(response)) => {
69 | |                                 let response = response.0;
...  |
78 | |                         }
79 | |                     }.into_actor(act)
   | |_____________________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #3 defined on the body at 64:13...
  --> src/ws_proxy.rs:64:13
   |
64 | /             |act, c| {
65 | |                 c.spawn(
66 | |                     async move {
67 | |                         match act.rcv.try_next() {
...  |
83 | |                 ()
84 | |             },
   | |_____________^
   = note: ...so that the types are compatible:
           expected &mut actix_web_actors::ws::WebsocketContext<ws_proxy::WsProxy>
              found &mut actix_web_actors::ws::WebsocketContext<ws_proxy::WsProxy>
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `actix::fut::map::Map<actix::fut::FutureWrap<impl core::future::future::Future, ws_proxy::WsProxy>, [closure@
src/ws_proxy.rs:80:26: 80:38]>` will meet its required lifetime bounds
  --> src/ws_proxy.rs:65:19
   |
65 |                 c.spawn(
   |                   ^^^^^

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0495`.
error: could not compile `sint-api-gateway`.

To learn more, run the command again with --verbose.

And that is because the closure is defined:

fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandlewhere
    F: FnOnce(&mut A, &mut A::Context) + 'static, `

And should have the 'static lifetime .

I think I found my error.

In the WsProxy StreamHandler I spawn a send to the other actor, and that is not a good idea.

Here the offending code:

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsProxy {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        ...
                ctx.spawn(
                    self.sintql
                        .send(msg)
                        .into_actor(self)
                        .map(|_, _, _| ()),
                );
            ...
    }
}

I should had made the send async, but made it as the context waiting the task to complete.

Here the fix:

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsProxy {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        ...
                ctx.wait(
                    self.sintql
                        .send(msg)
                        .into_actor(self)
                        .map(|_, _, _| ()),
                );
            ...
    }
}

Just that and no more lost messages.

Thanks for help @alice.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.