Help needed to understand the relation between tokio, tungstenite and futures

Hi rustaceans!

I am trying to implement a small toy project in order to get familiar with rust. Maybe I am biting off a little more than I can chew, I would like to get a small websocket API client for slack working.

Browsing around, it seems like tungstenite and its async variant tokio-tungstenite seem like good candidates for the core crates.

However, I can't seem to wrap my head around the typing of all of this.

Here is the function I have, that should allow me to open a websocket connection:

pub async fn open(token: &str) -> Result<futures_util::stream::FilterMap<WebSocketStream<MaybeTlsStream<TcpStream>>, ?, ?>, ConnectError> {
    let wsopen = post(token, "https://slack.com/api/apps.connections.open")
        .await?
        .json::<WSResponse>()
        .await;

    match wsopen {
        Ok(WSResponse::Open(WSOpen { url, .. })) => {
            match connect_async(url).await {
                Ok((ws_stream, _http_response)) => {
                    Ok(ws_stream.filter_map(|m| {
                        m.map(|msg| msg.into_text()).ok()
                    }))
                },
                Err(e) => Err(ConnectError::Websocket(e)),
            }
        },
        Ok(WSResponse::Error(WSError { error, .. })) => Err(ConnectError::Slack(error)),
        Err(e) => Err(ConnectError::Network(e)),
    }
}

I think I have a decent grasp around everything up until I try to convert the stream of tungstenite::Message into a stream of simple Strings using the filter_map function.

What should the two ? types in the function signature be? What do they actually represent in the filter_map computation?

Or maybe, this is a typical XY problem, and I am using the API completely wrong and I should actually structure things differently? My idea is to expose a function that is able to take an auth-token and return a stream of parsed Events, to which you then can also "write" events. All of this using async.

I would be really grateful about any help you can give me, even just docs or books to read on the subject :)!

Thanks!

You should not try to mention types like FilterMap in function signatures - use the impl Trait syntax instead.

pub async fn open(token: &str) -> Result<impl Stream<Item = ...>, ConnectError> {

That said, I generally recommend keeping the websocket in its original type and not using things like filter_map. Then, you can use the StreamExt::next method in a loop to read from it.

1 Like

Ah! I get it. This makes everything a lot clearer, thank you!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.