I'm attempting to implement a sort of websocket proxy with actix-web. As I cannot find a full example to start, I just copy-paste from the actix examples about webscokets.
Here is my code so far:
pub struct WsProxy {
sintql: Addr<SintQl>,
rcv: Receiver<JsonRpc>,
user: String,
}
impl Actor for WsProxy {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.listen_for_response(ctx);
}
}
impl WsProxy {
fn listen_for_response(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(Duration::from_millis(50), |act, c| {
match act.rcv.try_next() {
Ok(Some(response)) => {
let response = response.0;
info!("send back response for user {} {:?}", act.user, response);
c.write_raw(response);
}
Ok(None) => {
debug!("receiver got a response None and stopping");
act.finished(c);
}
Err(e) => trace!("receiver got an error {:?}", e),
}
});
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsProxy {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(msg) => {
info!(
"received request from user {} and forward {:?}",
self.user, msg
);
let msg: JsonRpc = msg.into();
ctx.spawn(
self.sintql
.send(msg)
.into_actor(self)
.map(|_, _, _| ()),
);
}
Err(e) => {
warn!(
"received an error from client {:?}\ngoing to stop connection",
e
);
self.finished(ctx);
}
}
}
}
struct SintQl {
out: SinkWrite<ws::Message, SplitSink<Framed<BoxedSocket, Codec>, ws::Message>>,
snd: Sender<JsonRpc>,
buf: Vec<Frame>,
}
impl Actor for SintQl {
type Context = Context<Self>;
}
impl Handler<JsonRpc> for SintQl {
type Result = ();
fn handle(&mut self, msg: JsonRpc, _ctx: &mut Self::Context) {
debug!("send a message to sintql {:?}", msg);
if let Err(e) = self.out.write(msg.0) {
error!("error send message to sintql {:?}", e);
}
}
}
impl StreamHandler<Result<Frame, WsProtocolError>> for SintQl {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(msg) => {
trace!("received from sintql {:?}", msg);
// In case message is too long and is divided in parts, all the parts
// have to be sent back in sequence, without any other message in the
// middle.
if let Frame::Continuation(actix_http::ws::Item::Last(_)) = msg {
self.buf.push(msg);
while !self.buf.is_empty() {
let mut snd = self.snd.clone();
let cmsg = self.buf.remove(0);
trace!("send continuation message");
match snd.try_send(cmsg.into()) {
Ok(_) => trace!("sent continutation"),
Err(e) => error!("sent continuation failed {:?}", e),
}
}
} else if let Frame::Continuation(_) = msg {
self.buf.push(msg);
} else {
let mut snd = self.snd.clone();
let fut = async move {
trace!("send sintql response to ws_proxy {:?}", msg);
snd.send(msg.into()).await
};
ctx.spawn(fut.into_actor(self).map(|_, _, _| ()));
}
}
Err(e) => {
error!("got an error from sintql {:?}", e);
// XXX: Overflow error will loop forever
self.finished(ctx);
}
};
}
}
impl actix::io::WriteHandler<WsProtocolError> for SintQl {}
That code looks to work fine, but in some cases, when I have very near messages, I got some of them missed.
The bad part is that I don't get an error, just looking on the log all seems fine, but just missed the message.
If I try the test on the actual websocket server, I don't miss any of the messages, so I know is a bug on my proxy.
What I see on the log is the following when it worked:
[2020-04-21T10:19:24Z INFO sint_api_gateway::ws_proxy] received request from user Unknown and forward Text("{\"jsonrpc\":\"2.0\",...\"id\":\"1587464364060834886\"}")
[2020-04-21T10:19:24Z DEBUG sint_api_gateway::ws_proxy] send a message to sintql JsonRpc(Text("{\"jsonrpc\":\"2.0\",...,\"id\":\"1587464364060834886\"}"))
The first line is the message received from a client browser, the second is the SintQl Actor that sends to the websocket server the message it received.
And when it does not work:
[2020-04-21T10:19:24Z INFO sint_api_gateway::ws_proxy] received request from user Unknown and forward Text("{\"jsonrpc\":\"2.0\",...,\"id\":\"1587464364045767034\"} ")
[2020-04-21T10:19:24Z INFO sint_api_gateway::ws_proxy] received request from user Unknown and forward Text("{\"jsonrpc\":\"2.0\",...\"id\":\"158746436404574323\"}")
[2020-04-21T10:19:24Z DEBUG sint_api_gateway::ws_proxy] send a message to sintql JsonRpc(Text("{\"jsonrpc\":\"2.0\",...\"id\":\"1587464364045767034\"}"))
Here there is a message received in the middle, while it still have to send the first.
But the last line say it is going to send it and the code is:
debug!("send a message to sintql {:?}", msg);
if let Err(e) = self.out.write(msg.0) {
error!("error send message to sintql {:?}", e);
}
As I don't find any error, I suppese he wrote the message, but the websocket server, doesn't receive anything.
I hope I can have any hint on some mistake on my code here, as I can't find the reason for such a behaviour.
Thanks in advance for any help.