Actix-web + Websockets without an actor

Hi,

I'm trying to use actix-web to handle websockets, but I don't really want to use the actix itself, ie. I don't want to use actors. The only examples that I could find use an Actor. On the Websockets page there's a following statement:

Actix-web supports WebSockets with the actix-web-actors crate. It is possible to convert a request’s Payload to a stream of ws::Message with a web::Payload and then use stream combinators to handle actual messages, but it is simpler to handle websocket communications with an http actor.

I've looked into the actix-web-actors crate to see how they handle it there and there's a WsStream struct, which implements decoding the stream into messages, but I fail to see how to use it in practice. The problem is that actix-web needs to return a response. So the pseudo code would look like that:

async fn ws_index(
    req: &HttpRequest,
    stream: web::Payload,
) -> Result<HttpResponse, Error> {
    let res = handshake(req)?;
    res.streaming(handle_incoming_stream(stream));
    res
}

I pass a stream to process incoming messages and I also need to return the stream that will be returned to the client. The simplest way to implement it would be to create a channel, spawn a tokio process for processing incoming messages and return a channel converted to a stream to the client. The problem is that the incoming stream is not Send, so I can't use it in a separate tokio process.

My guess is that I should create a struct that would allow polling for new outbound messages and also process the incoming stream, but I'm not sure how to tackle this. I've tried to read actix code to see how it's done there, but it looks like I would have to understand much more about the internals of actix to get what's going on. Any pointers to stuff that I should look at?

You should be able to spawn it with tokio::task::spawn_local, since actix uses a Tokio LocalSet for its single-threaded executor.

I'm using multi threaded runtime, so I don't think it will work, unfortunately.

Are you sure? Please double-check. If actix is passing a non-Send value to an async function, it must be running in a LocalSet (or in block_on), so there's something up here.

1 Like

You're totally right! I haven't used actix extensively, so I think I assumed it would behave similarly to a Tokio runtime, but now that I'm reading the README it runs workers on separate threads. Great catch with the non-Send value, I haven't made that connection. I think I know how to go from here, I'll paste a solution once I'm done.

For anyone trying to implement something like this:

pub async fn ws_index(
    req: HttpRequest,
    mut stream: web::Payload,
) -> Result<HttpResponse, Error> {
    let mut res = handshake(&req)?;
    let (tx, rx) = mpsc::unbounded_channel::<Result<Bytes, actix_web::Error>>();
    tokio::task::spawn_local(async move {
        while let Some(chunk) = stream.next().await {
            let chunk = chunk.unwrap();
            let mut codec = Codec::new();
            let mut buf = BytesMut::new();
            buf.extend_from_slice(&chunk[..]);

            let frame = codec.decode(&mut buf).unwrap();
            let frame_str = format!("frame: {:?}", frame);

            let message = ws::Message::Text(frame_str);
            let mut output_buffer = BytesMut::new();
            codec.encode(message, &mut output_buffer).unwrap();
            let b = output_buffer.split().freeze();
            tx.send(Ok(b)).unwrap();
        }
    });

    Ok(res.streaming(rx))
}

This will read the stream, convert the byte input into Frame objects and respond with a debug ouput from a frame.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.