SSE Axum Backpresure on unfold() vs repeat_with().throttle()

Hi,
I just coded up a axum handler that should stream the current time for learning reasons.
I endes up with two different solutions.
One with a unfold, a sleep and await of one second. And the other with repeath_with().throttle(one second)

Both worked and then (to be honest) I went and asked ChatGPT if there is any difference and it stated I must use unfold with the sleep and await for avoiding backpressure.

I think thats probably just made up and both things do the same in the end but I'm not sure.

Does anyone know if this statement about backpressure is correct?

Don't use this shit, especially not for things you don't already understand. It will poison your mind with wrong information. It doesn't matter whether you go fact-checking some stuff later, there will be other bullshit you just accept because it sounds very plausible or because it's presented as part of a conclusion that you verify to be correct (but the details aren't).

3 Likes

It's good that you're sceptical about the advice of the text generator and chose to ask here instead.

It depends on how the throttle is implemented. If it consumes an item of the stream and then holds on to it until the limit allows emission, then the time will be out of date. On the other hand, if it waits until the limit allows emission, and then fetches a new item and emits it, then it's fine and practically equivalent to the unfold version.

4 Likes

Thank you, this makes sense. I have to check if this is the case.

But in general non of these two implementations have something to do with the preventing of backpressure, because this argument confuses me.

I recommend using rocket instead.

#[macro_use] extern crate rocket;

use rocket::response::stream::{Event, EventStream};
use rocket::tokio::time::{sleep, Duration};
use chrono::Local;
use rocket::serde::json::Json;

#[get("/time/stream")]
fn time_stream() -> EventStream![] {
    EventStream! {
        loop {
            let now = Local::now().to_rfc3339();
            yield Event::data(now);
            sleep(Duration::from_secs(1)).await;
        }
    }
}

#[get("/")]
fn index() -> &'static str {
    r#"<html>
    <body>
      <h1>Time stream</h1>
      <div id="time">waiting...</div>
      <script>
        const evtSource = new EventSource('/time/stream');
        evtSource.onmessage = e => document.getElementById('time').textContent = e.data;
      </script>
    </body>
    </html>"#
}

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index, time_stream])
}
2 Likes

Sir there is no need to get angry and swear at others!

2 Likes