Using sse I need to transform `BroadcastStream<String>` to `BroadcastStream<sse::Event>`. Is it possible?

Using the code below I don't understand how to transform:

mystream: BroadcastStream<String>

to

mystream: BroadcastStream<sse::Event>

REPL: undefined | Rust Explorer

Can you help me understand?

As you can see the issue is in the func sse_handler_as_string().

struct AppState {
    tx: broadcast::Sender<Event>,
    tx_as_string: broadcast::Sender<String>,
}

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<Event>(100);

    let (tx_as_string, _) = broadcast::channel::<String>(100);

    let app_state = Arc::new(AppState { tx, tx_as_string });

    let app = Router::new()
        .route("/send_message", get(send_message))
        .route("/sse", get(sse_handler))
        .route("/send_message_as_string", get(send_message_as_string))
        .route("/sse_as_string", get(sse_handler_as_string))
        .with_state(app_state);

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn send_message(State(app_state): State<Arc<AppState>>) -> Html<&'static str> {
    app_state
        .tx
        .send(Event::default().data("custom_data"))
        .expect("send message");

    Html("<h1>Hello, World!</h1>")
}

async fn sse_handler(
    State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
    let rx = app_state.tx.subscribe();

    let mystream = BroadcastStream::new(rx);

    Sse::new(mystream).keep_alive(KeepAlive::default())
}

async fn send_message_as_string(State(app_state): State<Arc<AppState>>) -> Html<&'static str> {
    app_state
        .tx_as_string
        .send("custom_data".to_string())
        .expect("send message");

    Html("<h1>Hello, World!</h1>")
}

async fn sse_handler_as_string(
    State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<String, BroadcastStreamRecvError>>> {
    let rx = app_state.tx_as_string.subscribe();

    let mystream = BroadcastStream::new(rx);

    // HOW TO TRANSFORM mystream<String> TO axum::response::sse::Event here???

    Sse::new(mystream).keep_alive(KeepAlive::default())
}

I can use mystream.map() of course! Like this:

async fn sse_handler_as_string(
    State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<String, BroadcastStreamRecvError>>> {
    let rx = app_state.tx_as_string.subscribe();

    let mystream = BroadcastStream::new(rx).map(|s| Ok(Event::default().data(&s.unwrap())));

    Sse::new(mystream).keep_alive(KeepAlive::default())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.